1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 /**
17 * @addtogroup FFRT
18 * @{
19 *
20 * @brief Provides FFRT C++ APIs.
21 *
22 * @since 20
23 */
24
25 /**
26 * @file job_utils.h
27 *
28 * @brief Declares utilities for job scheduling, synchronization, and fiber management in FFRT.
29 *
30 * @library libffrt.z.so
31 * @kit FunctionFlowRuntimeKit
32 * @syscap SystemCapability.Resourceschedule.Ffrt.Core
33 * @since 20
34 */
35
36 #ifndef FFRT_JOB_UTILS_H
37 #define FFRT_JOB_UTILS_H
38
39 #include <cstdint>
40 #include <climits>
41 #include <unistd.h>
42 #include <sys/syscall.h>
43 #include <functional>
44 #include <string>
45 #include <thread>
46 #include <chrono>
47 #include <atomic>
48 #include <linux/futex.h>
49 #include "c/fiber.h"
50
51 #ifndef FFRT_API_LOGE
52 #define FFRT_API_LOGE(fmt, ...)
53 #endif
54 #ifndef FFRT_API_LOGD
55 #define FFRT_API_LOGD(fmt, ...)
56 #endif
57 #ifndef FFRT_API_TRACE_INT64
58 #define FFRT_API_TRACE_INT64(name, value)
59 #endif
60 #ifndef FFRT_API_TRACE_SCOPE
61 #define FFRT_API_TRACE_SCOPE(fmt, ...)
62 #endif
63
64 namespace ffrt {
65
66 /**
67 * @namespace ffrt::detail
68 * @brief Internal implementation details for FFRT utilities.
69 */
70 namespace detail {
71 /**
72 * @brief Cache line size constant.
73 */
74 static constexpr uint64_t cacheline_size = 64;
75
76 /**
77 * @brief Non-copyable base class. Inherit from this to prevent copy and assignment.
78 */
79 struct non_copyable {
80 protected:
81 non_copyable() = default;
82 ~non_copyable() = default;
83 non_copyable(const non_copyable&) = delete;
84 non_copyable& operator=(const non_copyable&) = delete;
85 };
86 } // ffrt::detail
87
88 /**
89 * @brief Wait for event (WFE) instruction for ARM architectures.
90 */
wfe()91 static inline void wfe()
92 {
93 #if (defined __aarch64__ || defined __arm__)
94 __asm__ volatile("wfe" : : : "memory");
95 #endif
96 }
97
98 /**
99 * @brief Aligns a value to the next power of two.
100 *
101 * @param x Input value.
102 * @return The next power of two greater than or equal to x.
103 */
align2n(uint64_t x)104 static inline constexpr uint64_t align2n(uint64_t x)
105 {
106 uint64_t i = 1;
107 uint64_t t = x;
108 while (x >>= 1) {
109 i <<= 1;
110 }
111 return (i < t) ? (i << 1) : i;
112 }
113
114 /**
115 * @struct futex
116 * @brief Futex-based synchronization primitives.
117 * @details Provides wait and wake operations using Linux futex syscall.
118 */
119 struct futex {
120 /**
121 * @brief Waits on a futex address until its value changes.
122 *
123 * @param uaddr Address to wait on.
124 * @param val Expected value.
125 */
waitfutex126 static inline void wait(int* uaddr, int val)
127 {
128 FFRT_API_LOGD("futex wait in %p", uaddr);
129 int r = call(uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, 0);
130 (void)(r);
131 FFRT_API_LOGD("futex wait %p ret %d", uaddr, r);
132 }
133
134 /**
135 * @brief Wakes up threads waiting on a futex address.
136 *
137 * @param uaddr Address to wake.
138 * @param num Number of threads to wake.
139 */
wakefutex140 static inline void wake(int* uaddr, int num)
141 {
142 int r = call(uaddr, FUTEX_WAKE_PRIVATE, num, nullptr, 0);
143 (void)(r);
144 FFRT_API_LOGD("futex wake %p ret %d", uaddr, r);
145 }
146
147 private:
148 /**
149 * @brief Internal futex syscall wrapper.
150 */
callfutex151 static inline int call(int* uaddr, int op, int val, const struct timespec* timeout, int bitset)
152 {
153 return syscall(SYS_futex, uaddr, op, val, timeout, NULL, bitset);
154 }
155 };
156
157 /**
158 * @struct atomic_wait
159 * @brief Atomic integer with futex-based wait/notify.
160 * @details Extends std::atomic<int> to support futex-based waiting and notification.
161 */
162 struct atomic_wait : std::atomic<int> {
163 using std::atomic<int>::atomic;
164 using std::atomic<int>::operator=;
165
166 /**
167 * @brief Waits until the atomic value changes from val.
168 *
169 * @param val Expected value.
170 */
waitatomic_wait171 inline void wait(int val)
172 {
173 futex::wait(reinterpret_cast<int*>(this), val);
174 }
175
176 /**
177 * @brief Notifies one waiting thread.
178 */
notify_oneatomic_wait179 inline auto notify_one()
180 {
181 futex::wake(reinterpret_cast<int*>(this), 1);
182 }
183
184 /**
185 * @brief Notifies all waiting threads.
186 */
notify_allatomic_wait187 inline void notify_all()
188 {
189 futex::wake(reinterpret_cast<int*>(this), INT_MAX);
190 }
191 };
192
193 /**
194 * @struct ref_obj
195 * @brief Reference-counted object base class.
196 *
197 * @tparam T Object type.
198 */
199 template <class T>
200 struct ref_obj {
201 /**
202 * @struct ptr
203 * @brief Smart pointer for reference-counted objects.
204 *
205 * Provides automatic reference counting and resource management for objects derived from ref_obj.
206 */
207 struct ptr {
208 /**
209 * @brief Default constructor. Initializes as a null pointer.
210 */
ptrref_obj::ptr211 ptr() {}
212
213 /**
214 * @brief Constructs from a raw pointer, taking ownership.
215 *
216 * @param p Raw pointer to the managed object.
217 */
ptrref_obj::ptr218 ptr(void* p) : p(static_cast<T*>(p)) {}
219
220 /**
221 * @brief Destructor. Decreases the reference count and deletes the object if necessary.
222 */
~ptrref_obj::ptr223 ~ptr()
224 {
225 reset();
226 }
227
228 /**
229 * @brief Copy constructor. Increases the reference count.
230 *
231 * @param h The smart pointer to copy from.
232 */
ptrref_obj::ptr233 inline ptr(ptr const& h)
234 {
235 *this = h;
236 }
237
238 /**
239 * @brief Copy assignment operator. Increases the reference count.
240 *
241 * @param h The smart pointer to assign from.
242 * @return Reference to this pointer.
243 */
244 inline ptr& operator=(ptr const& h)
245 {
246 if (this != &h) {
247 p = h.p;
248 if (p) {
249 p->inc_ref();
250 }
251 }
252 return *this;
253 }
254
255 /**
256 * @brief Move constructor. Transfers ownership without increasing the reference count.
257 *
258 * @param h The smart pointer to move from.
259 */
ptrref_obj::ptr260 inline ptr(ptr&& h)
261 {
262 *this = std::move(h);
263 }
264
265 /**
266 * @brief Move assignment operator. Transfers ownership without increasing the reference count.
267 *
268 * @param h The smart pointer to move from.
269 * @return Reference to this pointer.
270 */
271 inline ptr& operator=(ptr&& h)
272 {
273 if (this != &h) {
274 if (p) {
275 p->dec_ref();
276 }
277 p = h.p;
278 h.p = nullptr;
279 }
280 return *this;
281 }
282
283 /**
284 * @brief Returns the raw pointer to the managed object.
285 *
286 * @return Raw pointer to the object.
287 */
getref_obj::ptr288 constexpr inline T* get()
289 {
290 return p;
291 }
292
293 /**
294 * @brief Returns the raw pointer to the managed object (const version).
295 *
296 * @return Const raw pointer to the object.
297 */
getref_obj::ptr298 constexpr inline const T* get() const
299 {
300 return p;
301 }
302
303 /**
304 * @brief Arrow operator for member access.
305 *
306 * @return Raw pointer to the object.
307 */
308 constexpr inline T* operator -> ()
309 {
310 return p;
311 }
312
313 /**
314 * @brief Arrow operator for member access (const version).
315 *
316 * @return Const raw pointer to the object.
317 */
318 constexpr inline const T* operator -> () const
319 {
320 return p;
321 }
322
323 /**
324 * @brief Conversion operator to void pointer.
325 *
326 * @return Raw pointer as void*.
327 */
328 inline operator void* () const
329 {
330 return p;
331 }
332
333 /**
334 * @brief Releases the owned object and decreases the reference count.
335 */
resetref_obj::ptr336 inline void reset()
337 {
338 if (p) {
339 p->dec_ref();
340 p = nullptr;
341 }
342 }
343
344 private:
345 T* p = nullptr; ///< Raw pointer to the managed reference-counted object.
346 };
347
348 /**
349 * @brief Creates a new reference-counted object.
350 */
351 template<class... Args>
makeref_obj352 static inline ptr make(Args&& ... args)
353 {
354 auto p = new T(std::forward<Args>(args)...);
355 FFRT_API_LOGD("%s new %p", __PRETTY_FUNCTION__, p);
356 return ptr(p);
357 }
358
359 /**
360 * @brief Returns a singleton instance.
361 */
362 template<class... Args>
singletonref_obj363 static ptr& singleton(Args&& ... args)
364 {
365 static ptr s = make(std::forward<Args>(args)...);
366 return s;
367 }
368
369 /**
370 * @brief Increments the reference count.
371 */
inc_refref_obj372 inline void inc_ref()
373 {
374 ref.fetch_add(1, std::memory_order_relaxed);
375 }
376
377 /**
378 * @brief Decrements the reference count and deletes the object if zero.
379 */
dec_refref_obj380 inline void dec_ref()
381 {
382 if (ref.fetch_sub(1, std::memory_order_relaxed) == 1) {
383 FFRT_API_LOGD("%s delete %p", __PRETTY_FUNCTION__, this);
384 delete static_cast<T*>(this);
385 }
386 }
387
388 private:
389 std::atomic_uint64_t ref{1}; ///< Atomic reference counter for managing the lifetime of the object.
390 };
391
392 /**
393 * @struct mpmc_queue
394 * @brief Lock-free multi-producer multi-consumer queue.
395 *
396 * @tparam T Element type.
397 */
398 template <typename T>
399 struct mpmc_queue : detail::non_copyable {
400 /**
401 * @brief Constructs a queue with the given capacity.
402 *
403 * @param cap Capacity of the queue.
404 */
mpmc_queuempmc_queue405 mpmc_queue(uint64_t cap) : capacity(align2n(cap)), mask(capacity - 1)
406 {
407 if (std::is_pod_v<Item>) {
408 q = static_cast<Item*>(malloc(capacity * sizeof(Item)));
409 } else {
410 q = new Item [capacity];
411 }
412 for (size_t i = 0; i < capacity; ++i) {
413 q[i].iwrite_exp.store(i, std::memory_order_relaxed);
414 q[i].iread_exp.store(-1, std::memory_order_relaxed);
415 }
416
417 iwrite_.store(0, std::memory_order_relaxed);
418 iread_.store(0, std::memory_order_relaxed);
419 }
420
421 /**
422 * @brief Destructor.
423 */
~mpmc_queuempmc_queue424 ~mpmc_queue()
425 {
426 if (std::is_pod_v<Item>) {
427 free(q);
428 } else {
429 delete [] q;
430 }
431 }
432
433 /**
434 * @brief Returns the current size of the queue.
435 */
sizempmc_queue436 inline uint64_t size() const
437 {
438 auto head = iread_.load(std::memory_order_relaxed);
439 return iwrite_.load(std::memory_order_relaxed) - head;
440 }
441
442 /**
443 * @brief Attempts to push an element into the queue.
444 *
445 * @param data Element to push.
446 * @return True if successful, false otherwise.
447 */
try_pushmpmc_queue448 bool try_push(const T& data)
449 {
450 Item* i;
451 auto iwrite = iwrite_.load(std::memory_order_relaxed);
452 for (;;) {
453 i = &q[iwrite & mask];
454 if (i->iwrite_exp.load(std::memory_order_relaxed) != iwrite) {
455 return false;
456 }
457 if ((iwrite_.compare_exchange_weak(iwrite, iwrite + 1, std::memory_order_relaxed))) {
458 break;
459 }
460 }
461 i->data = data;
462 i->iread_exp.store(iwrite, std::memory_order_release);
463 return true;
464 }
465
466 /**
467 * @brief Attempts to pop an element from the queue.
468 *
469 * @param result Output parameter for the popped element.
470 * @return True if successful, false otherwise.
471 */
try_popmpmc_queue472 bool try_pop(T& result)
473 {
474 Item* i;
475 auto iread = iread_.load(std::memory_order_relaxed);
476 for (;;) {
477 i = &q[iread & mask];
478 if (i->iread_exp.load(std::memory_order_relaxed) != iread) {
479 return false;
480 }
481 if (iread_.compare_exchange_weak(iread, iread + 1, std::memory_order_relaxed)) {
482 break;
483 }
484 }
485 result = i->data;
486 i->iwrite_exp.store(iread + capacity, std::memory_order_release);
487 return true;
488 }
489
490 private:
491 uint64_t capacity; ///< Capacity of the queue (must be a power of two).
492 uint64_t mask; ///< Bitmask used for efficient index calculation (capacity - 1).
493
494 /**
495 * @brief Internal structure representing a queue slot.
496 */
497 struct Item {
498 T data; ///< Data stored in the queue slot.
499 std::atomic<uint64_t> iwrite_exp; ///< Expected write index after a read operation.
500 std::atomic<uint64_t> iread_exp; ///< Expected read index after a write operation
501 };
502
503 alignas(detail::cacheline_size) Item* q; ///< Pointer to the array of queue slots (items).
504 alignas(detail::cacheline_size) std::atomic<uint64_t> iwrite_; ///< Global write index for the queue.
505 alignas(detail::cacheline_size) std::atomic<uint64_t> iread_; ///< Global read index for the queue.
506 };
507
508 /**
509 * @brief Function pointer type for tasks.
510 */
511 using func_ptr = void(*)(void*);
512
513 /**
514 * @brief Structure representing a pointer-based task.
515 */
516 struct ptr_task {
517 func_ptr f; ///< Function pointer.
518 void* arg; ///< Argument pointer.
519 };
520
521 /**
522 * @struct runnable_queue
523 * @brief Runnable queue based on a given queue type.
524 *
525 * @tparam Queue Queue template type.
526 */
527 template<template<class> class Queue>
528 struct runnable_queue : Queue<ptr_task> {
529 /**
530 * @brief Constructs a runnable queue.
531 *
532 * @param depth Queue depth.
533 * @param name Queue name.
534 */
runnable_queuerunnable_queue535 runnable_queue(uint64_t depth, const std::string& name) : Queue<ptr_task>(depth), name(name) {}
536
537 /**
538 * @brief Attempts to run a task from the queue.
539 *
540 * @return True if a task was run, false otherwise.
541 */
try_runrunnable_queue542 inline bool try_run()
543 {
544 ptr_task job;
545 auto suc = this->try_pop(job);
546 if (!suc) {
547 return false;
548 }
549
550 FFRT_API_TRACE_INT64(name.c_str(), this->size());
551 job.f(job.arg);
552 return true;
553 }
554
555 /**
556 * @brief Pushes a task into the queue with a given policy.
557 *
558 * @tparam policy Push policy (0: sleep, 1: run).
559 * @param f Function pointer.
560 * @param p Argument pointer.
561 */
562 template <int policy>
pushrunnable_queue563 inline void push(func_ptr f, void* p)
564 {
565 uint64_t us = 1;
566 while (!this->try_push({f, p})) {
567 if constexpr(policy == 0) {
568 std::this_thread::sleep_for(std::chrono::microseconds(us));
569 us = us << 1;
570 } else if constexpr(policy == 1) {
571 try_run();
572 }
573 }
574 FFRT_API_TRACE_INT64(name.c_str(), this->size());
575 }
576
577 const std::string name; ///< Queue name.
578 };
579
580 /**
581 * @struct clock
582 * @brief High-resolution clock utilities.
583 */
584 struct clock {
585 using stamp = std::chrono::time_point<std::chrono::high_resolution_clock>;
586
587 /**
588 * @brief Returns the current time stamp.
589 */
nowclock590 static inline stamp now()
591 {
592 return std::chrono::high_resolution_clock::now();
593 }
594
595 /**
596 * @brief Returns the nanoseconds between two time stamps.
597 *
598 * @param from Start time.
599 * @param to End time (default: now).
600 * @return Nanoseconds between from and to.
601 */
602 static inline uint64_t ns(const stamp& from, stamp to = now())
603 {
604 return uint64_t(std::chrono::duration_cast<std::chrono::nanoseconds>(to - from).count());
605 }
606 };
607
608 /**
609 * @struct fiber
610 * @brief Lightweight fiber implementation.
611 *
612 * @tparam UsageId Usage identifier.
613 * @tparam FiberLocal Type for fiber-local storage.
614 * @tparam ThreadLocal Type for thread-local storage.
615 */
616 template <int UsageId = 0, class FiberLocal = char, class ThreadLocal = char>
617 struct fiber : detail::non_copyable {
618 /**
619 * @brief Thread environment for fiber execution.
620 */
621 struct thread_env : detail::non_copyable {
622 fiber* cur = nullptr;
623 bool (*cond)(void*) = nullptr;
624 ThreadLocal tl;
625 };
626
627 /**
628 * @brief Returns the thread-local environment.
629 */
envfiber630 static __attribute__((noinline)) thread_env& env()
631 {
632 static thread_local thread_env ctx;
633 return ctx;
634 }
635
636 /**
637 * @brief Initializes a fiber with a function and stack.
638 *
639 * @param f Function to run.
640 * @param stack Stack memory.
641 * @param stack_size Stack size.
642 * @return Pointer to the created fiber.
643 */
initfiber644 static fiber* init(std::function<void()>&& f, void* stack, size_t stack_size)
645 {
646 if (stack == nullptr || stack_size < sizeof(fiber) + min_stack_size) {
647 return nullptr;
648 }
649 auto c = new (stack) fiber(std::forward<std::function<void()>>(f));
650 if (ffrt_fiber_init(&c->fb, reinterpret_cast<void(*)(void*)>(fiber_entry), c,
651 static_cast<char*>(stack) + sizeof(fiber), stack_size - sizeof(fiber))) {
652 c->~fiber<UsageId, FiberLocal, ThreadLocal>();
653 return nullptr;
654 }
655
656 FFRT_API_LOGD("job %lu create", c->id);
657 return c;
658 }
659
660 /**
661 * @brief Destroys the fiber.
662 */
destroyfiber663 inline void destroy()
664 {
665 FFRT_API_LOGD("job %lu destroy", id);
666 this->~fiber<UsageId, FiberLocal, ThreadLocal>();
667 }
668
669 /**
670 * @brief Starts the fiber execution.
671 *
672 * @return True if finished, false otherwise.
673 */
startfiber674 bool start()
675 {
676 bool done;
677 auto& e = fiber::env();
678
679 do {
680 e.cond = nullptr;
681 e.cur = this;
682 FFRT_API_LOGD("job %lu switch in", id);
683 ffrt_fiber_switch(&link, &fb);
684 FFRT_API_LOGD("job %lu switch out", id);
685 done = this->done;
686 } while (e.cond && !(e.cond)(this));
687 e.cond = nullptr;
688 return done;
689 }
690
691 /**
692 * @brief Suspends the current fiber.
693 *
694 * @tparam is_final Whether this is the final suspension.
695 * @param e Thread environment.
696 * @param cond Condition function.
697 */
698 template<bool is_final = false>
699 static inline void suspend(thread_env& e, bool (*cond)(void*) = nullptr)
700 {
701 auto j = e.cur;
702 if constexpr(is_final) {
703 j->done = true;
704 } else {
705 e.cond = cond;
706 }
707 e.cur = nullptr;
708
709 ffrt_fiber_switch(&j->fb, &j->link);
710 }
711
712 /**
713 * @brief Suspends the current fiber.
714 *
715 * @tparam is_final Whether this is the final suspension.
716 * @param cond Condition function.
717 */
718 template<bool is_final = false>
719 static inline void suspend(bool (*cond)(void*) = nullptr)
720 {
721 suspend<is_final>(fiber::env(), cond);
722 }
723
724 /**
725 * @brief Returns the fiber-local storage.
726 */
localfiber727 FiberLocal& local()
728 {
729 return local_;
730 }
731
732 uint64_t id; ///< Fiber identifier.
733
734 private:
735 static constexpr uint64_t min_stack_size = 32; ///< Minimum stack size required for a fiber.
736
737 /**
738 * @brief Constructs a fiber object with the given function and stack size.
739 *
740 * @param f Function to execute in the fiber.
741 */
fiberfiber742 fiber(std::function<void()>&& f)
743 {
744 fn = std::forward<std::function<void()>>(f);
745 id = idx.fetch_add(1, std::memory_order_relaxed);
746 }
747
748 /**
749 * @brief Fiber entry function. Executes the user function and suspends the fiber upon completion.
750 *
751 * @param c Pointer to the fiber object.
752 */
fiber_entryfiber753 static void fiber_entry(fiber* c)
754 {
755 c->fn();
756 c->fn = nullptr;
757 suspend<true>();
758 }
759
760 ffrt_fiber_t fb; ///< Fiber context for execution.
761 ffrt_fiber_t link; ///< Link to the previous fiber context.
762 std::function<void()> fn; ///< Function to be executed by the fiber.
763 bool done = false; ///< Indicates whether the fiber has finished execution.
764 FiberLocal local_; ///< Fiber-local storage.
765 static inline std::atomic_uint64_t idx{0}; ///< Atomic counter for generating unique fiber IDs.
766 };
767
768 } // namespace ffrt
769
770 #endif // FFRT_JOB_UTILS_H
771 /** @} */