• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /**
17  * @addtogroup FFRT
18  * @{
19  *
20  * @brief Provides FFRT C++ APIs.
21  *
22  * @since 20
23  */
24 
25 /**
26  * @file job_utils.h
27  *
28  * @brief Declares utilities for job scheduling, synchronization, and fiber management in FFRT.
29  *
30  * @library libffrt.z.so
31  * @kit FunctionFlowRuntimeKit
32  * @syscap SystemCapability.Resourceschedule.Ffrt.Core
33  * @since 20
34  */
35 
36 #ifndef FFRT_JOB_UTILS_H
37 #define FFRT_JOB_UTILS_H
38 
39 #include <cstdint>
40 #include <climits>
41 #include <unistd.h>
42 #include <sys/syscall.h>
43 #include <functional>
44 #include <string>
45 #include <thread>
46 #include <chrono>
47 #include <atomic>
48 #include <linux/futex.h>
49 #include "c/fiber.h"
50 
51 #ifndef FFRT_API_LOGE
52 #define FFRT_API_LOGE(fmt, ...)
53 #endif
54 #ifndef FFRT_API_LOGD
55 #define FFRT_API_LOGD(fmt, ...)
56 #endif
57 #ifndef FFRT_API_TRACE_INT64
58 #define FFRT_API_TRACE_INT64(name, value)
59 #endif
60 #ifndef FFRT_API_TRACE_SCOPE
61 #define FFRT_API_TRACE_SCOPE(fmt, ...)
62 #endif
63 
64 namespace ffrt {
65 
66 /**
67  * @namespace ffrt::detail
68  * @brief Internal implementation details for FFRT utilities.
69  */
70 namespace detail {
71     /**
72      * @brief Cache line size constant.
73      */
74     static constexpr uint64_t cacheline_size = 64;
75 
76     /**
77      * @brief Non-copyable base class. Inherit from this to prevent copy and assignment.
78      */
79     struct non_copyable {
80     protected:
81         non_copyable() = default;
82         ~non_copyable() = default;
83         non_copyable(const non_copyable&) = delete;
84         non_copyable& operator=(const non_copyable&) = delete;
85     };
86 } // ffrt::detail
87 
88 /**
89  * @brief Wait for event (WFE) instruction for ARM architectures.
90  */
wfe()91 static inline void wfe()
92 {
93 #if (defined __aarch64__ || defined __arm__)
94     __asm__ volatile("wfe" : : : "memory");
95 #endif
96 }
97 
98 /**
99  * @brief Aligns a value to the next power of two.
100  *
101  * @param x Input value.
102  * @return The next power of two greater than or equal to x.
103  */
align2n(uint64_t x)104 static inline constexpr uint64_t align2n(uint64_t x)
105 {
106     uint64_t i = 1;
107     uint64_t t = x;
108     while (x >>= 1) {
109         i <<= 1;
110     }
111     return (i < t) ? (i << 1) : i;
112 }
113 
114 /**
115  * @struct futex
116  * @brief Futex-based synchronization primitives.
117  * @details Provides wait and wake operations using Linux futex syscall.
118  */
119 struct futex {
120     /**
121      * @brief Waits on a futex address until its value changes.
122      *
123      * @param uaddr Address to wait on.
124      * @param val Expected value.
125      */
waitfutex126     static inline void wait(int* uaddr, int val)
127     {
128         FFRT_API_LOGD("futex wait in %p", uaddr);
129         int r = call(uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, 0);
130         (void)(r);
131         FFRT_API_LOGD("futex wait %p ret %d", uaddr, r);
132     }
133 
134     /**
135      * @brief Wakes up threads waiting on a futex address.
136      *
137      * @param uaddr Address to wake.
138      * @param num Number of threads to wake.
139      */
wakefutex140     static inline void wake(int* uaddr, int num)
141     {
142         int r = call(uaddr, FUTEX_WAKE_PRIVATE, num, nullptr, 0);
143         (void)(r);
144         FFRT_API_LOGD("futex wake %p ret %d", uaddr, r);
145     }
146 
147 private:
148     /**
149      * @brief Internal futex syscall wrapper.
150      */
callfutex151     static inline int call(int* uaddr, int op, int val, const struct timespec* timeout, int bitset)
152     {
153         return syscall(SYS_futex, uaddr, op, val, timeout, NULL, bitset);
154     }
155 };
156 
157 /**
158  * @struct atomic_wait
159  * @brief Atomic integer with futex-based wait/notify.
160  * @details Extends std::atomic<int> to support futex-based waiting and notification.
161  */
162 struct atomic_wait : std::atomic<int> {
163     using std::atomic<int>::atomic;
164     using std::atomic<int>::operator=;
165 
166     /**
167      * @brief Waits until the atomic value changes from val.
168      *
169      * @param val Expected value.
170      */
waitatomic_wait171     inline void wait(int val)
172     {
173         futex::wait(reinterpret_cast<int*>(this), val);
174     }
175 
176     /**
177      * @brief Notifies one waiting thread.
178      */
notify_oneatomic_wait179     inline auto notify_one()
180     {
181         futex::wake(reinterpret_cast<int*>(this), 1);
182     }
183 
184     /**
185      * @brief Notifies all waiting threads.
186      */
notify_allatomic_wait187     inline void notify_all()
188     {
189         futex::wake(reinterpret_cast<int*>(this), INT_MAX);
190     }
191 };
192 
193 /**
194  * @struct ref_obj
195  * @brief Reference-counted object base class.
196  *
197  * @tparam T Object type.
198  */
199 template <class T>
200 struct ref_obj {
201     /**
202      * @struct ptr
203      * @brief Smart pointer for reference-counted objects.
204      *
205      * Provides automatic reference counting and resource management for objects derived from ref_obj.
206      */
207     struct ptr {
208         /**
209          * @brief Default constructor. Initializes as a null pointer.
210          */
ptrref_obj::ptr211         ptr() {}
212 
213         /**
214          * @brief Constructs from a raw pointer, taking ownership.
215          *
216          * @param p Raw pointer to the managed object.
217          */
ptrref_obj::ptr218         ptr(void* p) : p(static_cast<T*>(p)) {}
219 
220         /**
221          * @brief Destructor. Decreases the reference count and deletes the object if necessary.
222          */
~ptrref_obj::ptr223         ~ptr()
224         {
225             reset();
226         }
227 
228         /**
229          * @brief Copy constructor. Increases the reference count.
230          *
231          * @param h The smart pointer to copy from.
232          */
ptrref_obj::ptr233         inline ptr(ptr const& h)
234         {
235             *this = h;
236         }
237 
238         /**
239          * @brief Copy assignment operator. Increases the reference count.
240          *
241          * @param h The smart pointer to assign from.
242          * @return Reference to this pointer.
243          */
244         inline ptr& operator=(ptr const& h)
245         {
246             if (this != &h) {
247                 p = h.p;
248                 if (p) {
249                     p->inc_ref();
250                 }
251             }
252             return *this;
253         }
254 
255         /**
256          * @brief Move constructor. Transfers ownership without increasing the reference count.
257          *
258          * @param h The smart pointer to move from.
259          */
ptrref_obj::ptr260         inline ptr(ptr&& h)
261         {
262             *this = std::move(h);
263         }
264 
265         /**
266          * @brief Move assignment operator. Transfers ownership without increasing the reference count.
267          *
268          * @param h The smart pointer to move from.
269          * @return Reference to this pointer.
270          */
271         inline ptr& operator=(ptr&& h)
272         {
273             if (this != &h) {
274                 if (p) {
275                     p->dec_ref();
276                 }
277                 p = h.p;
278                 h.p = nullptr;
279             }
280             return *this;
281         }
282 
283         /**
284          * @brief Returns the raw pointer to the managed object.
285          *
286          * @return Raw pointer to the object.
287          */
getref_obj::ptr288         constexpr inline T* get()
289         {
290             return p;
291         }
292 
293         /**
294          * @brief Returns the raw pointer to the managed object (const version).
295          *
296          * @return Const raw pointer to the object.
297          */
getref_obj::ptr298         constexpr inline const T* get() const
299         {
300             return p;
301         }
302 
303         /**
304          * @brief Arrow operator for member access.
305          *
306          * @return Raw pointer to the object.
307          */
308         constexpr inline T* operator -> ()
309         {
310             return p;
311         }
312 
313         /**
314          * @brief Arrow operator for member access (const version).
315          *
316          * @return Const raw pointer to the object.
317          */
318         constexpr inline const T* operator -> () const
319         {
320             return p;
321         }
322 
323         /**
324          * @brief Conversion operator to void pointer.
325          *
326          * @return Raw pointer as void*.
327          */
328         inline operator void* () const
329         {
330             return p;
331         }
332 
333         /**
334          * @brief Releases the owned object and decreases the reference count.
335          */
resetref_obj::ptr336         inline void reset()
337         {
338             if (p) {
339                 p->dec_ref();
340                 p = nullptr;
341             }
342         }
343 
344     private:
345         T* p = nullptr; ///< Raw pointer to the managed reference-counted object.
346     };
347 
348     /**
349      * @brief Creates a new reference-counted object.
350      */
351     template<class... Args>
makeref_obj352     static inline ptr make(Args&& ... args)
353     {
354         auto p = new T(std::forward<Args>(args)...);
355         FFRT_API_LOGD("%s new %p", __PRETTY_FUNCTION__, p);
356         return ptr(p);
357     }
358 
359     /**
360      * @brief Returns a singleton instance.
361      */
362     template<class... Args>
singletonref_obj363     static ptr& singleton(Args&& ... args)
364     {
365         static ptr s = make(std::forward<Args>(args)...);
366         return s;
367     }
368 
369     /**
370      * @brief Increments the reference count.
371      */
inc_refref_obj372     inline void inc_ref()
373     {
374         ref.fetch_add(1, std::memory_order_relaxed);
375     }
376 
377     /**
378      * @brief Decrements the reference count and deletes the object if zero.
379      */
dec_refref_obj380     inline void dec_ref()
381     {
382         if (ref.fetch_sub(1, std::memory_order_relaxed) == 1) {
383             FFRT_API_LOGD("%s delete %p", __PRETTY_FUNCTION__, this);
384             delete static_cast<T*>(this);
385         }
386     }
387 
388 private:
389     std::atomic_uint64_t ref{1}; ///< Atomic reference counter for managing the lifetime of the object.
390 };
391 
392 /**
393  * @struct mpmc_queue
394  * @brief Lock-free multi-producer multi-consumer queue.
395  *
396  * @tparam T Element type.
397  */
398 template <typename T>
399 struct mpmc_queue : detail::non_copyable {
400     /**
401      * @brief Constructs a queue with the given capacity.
402      *
403      * @param cap Capacity of the queue.
404      */
mpmc_queuempmc_queue405     mpmc_queue(uint64_t cap) : capacity(align2n(cap)), mask(capacity - 1)
406     {
407         if (std::is_pod_v<Item>) {
408             q = static_cast<Item*>(malloc(capacity * sizeof(Item)));
409         } else {
410             q = new Item [capacity];
411         }
412         for (size_t i = 0; i < capacity; ++i) {
413             q[i].iwrite_exp.store(i, std::memory_order_relaxed);
414             q[i].iread_exp.store(-1, std::memory_order_relaxed);
415         }
416 
417         iwrite_.store(0, std::memory_order_relaxed);
418         iread_.store(0, std::memory_order_relaxed);
419     }
420 
421     /**
422      * @brief Destructor.
423      */
~mpmc_queuempmc_queue424     ~mpmc_queue()
425     {
426         if (std::is_pod_v<Item>) {
427             free(q);
428         } else {
429             delete [] q;
430         }
431     }
432 
433     /**
434      * @brief Returns the current size of the queue.
435      */
sizempmc_queue436     inline uint64_t size() const
437     {
438         auto head = iread_.load(std::memory_order_relaxed);
439         return iwrite_.load(std::memory_order_relaxed) - head;
440     }
441 
442     /**
443      * @brief Attempts to push an element into the queue.
444      *
445      * @param data Element to push.
446      * @return True if successful, false otherwise.
447      */
try_pushmpmc_queue448     bool try_push(const T& data)
449     {
450         Item* i;
451         auto iwrite = iwrite_.load(std::memory_order_relaxed);
452         for (;;) {
453             i = &q[iwrite & mask];
454             if (i->iwrite_exp.load(std::memory_order_relaxed) != iwrite) {
455                 return false;
456             }
457             if ((iwrite_.compare_exchange_weak(iwrite, iwrite + 1, std::memory_order_relaxed))) {
458                 break;
459             }
460         }
461         i->data = data;
462         i->iread_exp.store(iwrite, std::memory_order_release);
463         return true;
464     }
465 
466     /**
467      * @brief Attempts to pop an element from the queue.
468      *
469      * @param result Output parameter for the popped element.
470      * @return True if successful, false otherwise.
471      */
try_popmpmc_queue472     bool try_pop(T& result)
473     {
474         Item* i;
475         auto iread = iread_.load(std::memory_order_relaxed);
476         for (;;) {
477             i = &q[iread & mask];
478             if (i->iread_exp.load(std::memory_order_relaxed) != iread) {
479                 return false;
480             }
481             if (iread_.compare_exchange_weak(iread, iread + 1, std::memory_order_relaxed)) {
482                 break;
483             }
484         }
485         result = i->data;
486         i->iwrite_exp.store(iread + capacity, std::memory_order_release);
487         return true;
488     }
489 
490 private:
491     uint64_t capacity; ///< Capacity of the queue (must be a power of two).
492     uint64_t mask;     ///< Bitmask used for efficient index calculation (capacity - 1).
493 
494     /**
495      * @brief Internal structure representing a queue slot.
496      */
497     struct Item {
498         T data;                           ///< Data stored in the queue slot.
499         std::atomic<uint64_t> iwrite_exp; ///< Expected write index after a read operation.
500         std::atomic<uint64_t> iread_exp;  ///< Expected read index after a write operation
501     };
502 
503     alignas(detail::cacheline_size) Item* q;                       ///< Pointer to the array of queue slots (items).
504     alignas(detail::cacheline_size) std::atomic<uint64_t> iwrite_; ///< Global write index for the queue.
505     alignas(detail::cacheline_size) std::atomic<uint64_t> iread_;  ///< Global read index for the queue.
506 };
507 
508 /**
509  * @brief Function pointer type for tasks.
510  */
511 using func_ptr = void(*)(void*);
512 
513 /**
514  * @brief Structure representing a pointer-based task.
515  */
516 struct ptr_task {
517     func_ptr f; ///< Function pointer.
518     void* arg;  ///< Argument pointer.
519 };
520 
521 /**
522  * @struct runnable_queue
523  * @brief Runnable queue based on a given queue type.
524  *
525  * @tparam Queue Queue template type.
526  */
527 template<template<class> class Queue>
528 struct runnable_queue : Queue<ptr_task> {
529     /**
530      * @brief Constructs a runnable queue.
531      *
532      * @param depth Queue depth.
533      * @param name Queue name.
534      */
runnable_queuerunnable_queue535     runnable_queue(uint64_t depth, const std::string& name) : Queue<ptr_task>(depth), name(name) {}
536 
537     /**
538      * @brief Attempts to run a task from the queue.
539      *
540      * @return True if a task was run, false otherwise.
541      */
try_runrunnable_queue542     inline bool try_run()
543     {
544         ptr_task job;
545         auto suc = this->try_pop(job);
546         if (!suc) {
547             return false;
548         }
549 
550         FFRT_API_TRACE_INT64(name.c_str(), this->size());
551         job.f(job.arg);
552         return true;
553     }
554 
555     /**
556      * @brief Pushes a task into the queue with a given policy.
557      *
558      * @tparam policy Push policy (0: sleep, 1: run).
559      * @param f Function pointer.
560      * @param p Argument pointer.
561      */
562     template <int policy>
pushrunnable_queue563     inline void push(func_ptr f, void* p)
564     {
565         uint64_t us = 1;
566         while (!this->try_push({f, p})) {
567             if constexpr(policy == 0) {
568                 std::this_thread::sleep_for(std::chrono::microseconds(us));
569                 us = us << 1;
570             } else if constexpr(policy == 1) {
571                 try_run();
572             }
573         }
574         FFRT_API_TRACE_INT64(name.c_str(), this->size());
575     }
576 
577     const std::string name; ///< Queue name.
578 };
579 
580 /**
581  * @struct clock
582  * @brief High-resolution clock utilities.
583  */
584 struct clock {
585     using stamp = std::chrono::time_point<std::chrono::high_resolution_clock>;
586 
587     /**
588      * @brief Returns the current time stamp.
589      */
nowclock590     static inline stamp now()
591     {
592         return std::chrono::high_resolution_clock::now();
593     }
594 
595     /**
596      * @brief Returns the nanoseconds between two time stamps.
597      *
598      * @param from Start time.
599      * @param to End time (default: now).
600      * @return Nanoseconds between from and to.
601      */
602     static inline uint64_t ns(const stamp& from, stamp to = now())
603     {
604         return uint64_t(std::chrono::duration_cast<std::chrono::nanoseconds>(to - from).count());
605     }
606 };
607 
608 /**
609  * @struct fiber
610  * @brief Lightweight fiber implementation.
611  *
612  * @tparam UsageId Usage identifier.
613  * @tparam FiberLocal Type for fiber-local storage.
614  * @tparam ThreadLocal Type for thread-local storage.
615  */
616 template <int UsageId = 0, class FiberLocal = char, class ThreadLocal = char>
617 struct fiber : detail::non_copyable {
618     /**
619      * @brief Thread environment for fiber execution.
620      */
621     struct thread_env :  detail::non_copyable {
622         fiber* cur = nullptr;
623         bool (*cond)(void*) = nullptr;
624         ThreadLocal tl;
625     };
626 
627     /**
628      * @brief Returns the thread-local environment.
629      */
envfiber630     static __attribute__((noinline)) thread_env& env()
631     {
632         static thread_local thread_env ctx;
633         return ctx;
634     }
635 
636     /**
637      * @brief Initializes a fiber with a function and stack.
638      *
639      * @param f Function to run.
640      * @param stack Stack memory.
641      * @param stack_size Stack size.
642      * @return Pointer to the created fiber.
643      */
initfiber644     static fiber* init(std::function<void()>&& f, void* stack, size_t stack_size)
645     {
646         if (stack == nullptr || stack_size < sizeof(fiber) + min_stack_size) {
647             return nullptr;
648         }
649         auto c = new (stack) fiber(std::forward<std::function<void()>>(f));
650         if (ffrt_fiber_init(&c->fb, reinterpret_cast<void(*)(void*)>(fiber_entry), c,
651             static_cast<char*>(stack) + sizeof(fiber), stack_size - sizeof(fiber))) {
652             c->~fiber<UsageId, FiberLocal, ThreadLocal>();
653             return nullptr;
654         }
655 
656         FFRT_API_LOGD("job %lu create", c->id);
657         return c;
658     }
659 
660     /**
661      * @brief Destroys the fiber.
662      */
destroyfiber663     inline void destroy()
664     {
665         FFRT_API_LOGD("job %lu destroy", id);
666         this->~fiber<UsageId, FiberLocal, ThreadLocal>();
667     }
668 
669     /**
670      * @brief Starts the fiber execution.
671      *
672      * @return True if finished, false otherwise.
673      */
startfiber674     bool start()
675     {
676         bool done;
677         auto& e = fiber::env();
678 
679         do {
680             e.cond = nullptr;
681             e.cur = this;
682             FFRT_API_LOGD("job %lu switch in", id);
683             ffrt_fiber_switch(&link, &fb);
684             FFRT_API_LOGD("job %lu switch out", id);
685             done = this->done;
686         } while (e.cond && !(e.cond)(this));
687         e.cond = nullptr;
688         return done;
689     }
690 
691     /**
692      * @brief Suspends the current fiber.
693      *
694      * @tparam is_final Whether this is the final suspension.
695      * @param e Thread environment.
696      * @param cond Condition function.
697      */
698     template<bool is_final = false>
699     static inline void suspend(thread_env& e, bool (*cond)(void*) = nullptr)
700     {
701         auto j = e.cur;
702         if constexpr(is_final) {
703             j->done = true;
704         } else {
705             e.cond = cond;
706         }
707         e.cur = nullptr;
708 
709         ffrt_fiber_switch(&j->fb, &j->link);
710     }
711 
712     /**
713      * @brief Suspends the current fiber.
714      *
715      * @tparam is_final Whether this is the final suspension.
716      * @param cond Condition function.
717      */
718     template<bool is_final = false>
719     static inline void suspend(bool (*cond)(void*) = nullptr)
720     {
721         suspend<is_final>(fiber::env(), cond);
722     }
723 
724     /**
725      * @brief Returns the fiber-local storage.
726      */
localfiber727     FiberLocal& local()
728     {
729         return local_;
730     }
731 
732     uint64_t id; ///< Fiber identifier.
733 
734 private:
735     static constexpr uint64_t min_stack_size = 32; ///< Minimum stack size required for a fiber.
736 
737     /**
738      * @brief Constructs a fiber object with the given function and stack size.
739      *
740      * @param f Function to execute in the fiber.
741      */
fiberfiber742     fiber(std::function<void()>&& f)
743     {
744         fn = std::forward<std::function<void()>>(f);
745         id = idx.fetch_add(1, std::memory_order_relaxed);
746     }
747 
748     /**
749      * @brief Fiber entry function. Executes the user function and suspends the fiber upon completion.
750      *
751      * @param c Pointer to the fiber object.
752      */
fiber_entryfiber753     static void fiber_entry(fiber* c)
754     {
755         c->fn();
756         c->fn = nullptr;
757         suspend<true>();
758     }
759 
760     ffrt_fiber_t fb;          ///< Fiber context for execution.
761     ffrt_fiber_t link;        ///< Link to the previous fiber context.
762     std::function<void()> fn; ///< Function to be executed by the fiber.
763     bool done = false;        ///< Indicates whether the fiber has finished execution.
764     FiberLocal local_;        ///< Fiber-local storage.
765     static inline std::atomic_uint64_t idx{0}; ///< Atomic counter for generating unique fiber IDs.
766 };
767 
768 } // namespace ffrt
769 
770 #endif // FFRT_JOB_UTILS_H
771 /** @} */