• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /**
17  * @addtogroup FFRT
18  * @{
19  *
20  * @brief Provides FFRT C++ APIs.
21  *
22  * @since 20
23  */
24 
25 /**
26  * @file job_partner.h
27  *
28  * @brief Declares the job partner interfaces in C++.
29  *
30  * @library libffrt.z.so
31  * @kit FunctionFlowRuntimeKit
32  * @syscap SystemCapability.Resourceschedule.Ffrt.Core
33  * @since 20
34  */
35 
36 #ifndef FFRT_JOB_PARTNER_H
37 #define FFRT_JOB_PARTNER_H
38 
39 #include <functional>
40 #include <string>
41 #include <atomic>
42 #include "job_utils.h"
43 #include "cpp/task.h"
44 
45 namespace ffrt {
46 
47 /**
48  * @struct job_partner_attr
49  * @brief Defines the job partner attribute structure for controlling worker concurrency.
50  *
51  * This structure provides initialization and configuration for job partner attributes,
52  * including QoS, maximum worker number, ratio, threshold, busy wait time, and queue depth.
53  *
54  * The relationship between job number and partner number is illustrated as follows:
55  * @verbatim
56  * partner_num
57  *     ^
58  *     |
59  * max |            ------------------
60  *     |           /
61  *     |    ratio /
62  *     |         /
63  *     |        /
64  *     |       /
65  *     |      /
66  *     +------------------------------> job_num
67  *         threshold
68  * @endverbatim
69  *
70  * - The vertical axis is partner_num, and the horizontal axis is job_num.
71  * - Threshold: When job_num is less than threshold, partner_num is 0.
72  * - Ratio control: When job_num is between threshold and "max * ratio + threshold",
73  *   partner_num is calculated as "round((job_num - threshold) / ratio)".
74  * - Maximum value: When job_num is greater than "max * ratio + threshold", partner_num is the maximum value.
75  *
76  * @since 20
77  */
78 struct job_partner_attr {
79     /**
80      * @brief Set QoS level.
81      *
82      * @param q QoS value.
83      * @return Reference to this attribute object.
84      */
qosjob_partner_attr85     inline job_partner_attr& qos(qos q)
86     {
87         this->qos_ = q;
88         return *this;
89     }
90 
91     /**
92      * @brief Set max number of partner workers.
93      *
94      * @param v Maximum number of workers.
95      * @return Reference to this attribute object.
96      */
max_numjob_partner_attr97     inline job_partner_attr& max_num(uint64_t v)
98     {
99         this->max_num_ = v;
100         return *this;
101     }
102 
103     /**
104      * @brief Set the ratio parameter for controlling the number of workers.
105      *
106      * @param v Ratio value.
107      * @return Reference to this attribute object.
108      */
ratiojob_partner_attr109     inline job_partner_attr& ratio(uint64_t v)
110     {
111         this->ratio_ = v;
112         return *this;
113     }
114 
115     /**
116      * @brief Set the threshold parameter for controlling the number of workers.
117      *
118      * @param v Threshold value.
119      * @return Reference to this attribute object.
120      */
thresholdjob_partner_attr121     inline job_partner_attr& threshold(uint64_t v)
122     {
123         this->threshold_ = v;
124         return *this;
125     }
126 
127     /**
128      * @brief Set last worker's retry busy time (in microseconds).
129      *
130      * @param us Busy wait time in microseconds.
131      * @return Reference to this attribute object.
132      */
busyjob_partner_attr133     inline job_partner_attr& busy(uint64_t us)
134     {
135         this->busy_us_ = us;
136         return *this;
137     }
138 
139     /**
140      * @brief Set the depth of job queue.
141      *
142      * @param depth Queue depth.
143      * @return Reference to this attribute object.
144      */
queue_depthjob_partner_attr145     inline job_partner_attr& queue_depth(uint64_t depth)
146     {
147         this->queue_depth_ = depth;
148         return *this;
149     }
150 
151     /**
152      * @brief Get QoS level.
153      *
154      * @return QoS value.
155      */
qosjob_partner_attr156     inline int qos() const
157     {
158         return this->qos_;
159     }
160 
161     /**
162      * @brief Get max number of partner workers.
163      *
164      * @return Maximum number of workers.
165      */
max_numjob_partner_attr166     inline uint64_t max_num() const
167     {
168         return this->max_num_;
169     }
170 
171     /**
172      * @brief Get the ratio parameter for controlling the number of workers.
173      *
174      * @return Ratio value.
175      */
ratiojob_partner_attr176     inline uint64_t ratio() const
177     {
178         return this->ratio_;
179     }
180 
181     /**
182      * @brief Get the threshold parameter for controlling the number of workers.
183      *
184      * @return Threshold value.
185      */
thresholdjob_partner_attr186     inline uint64_t threshold() const
187     {
188         return this->threshold_;
189     }
190 
191     /**
192      * @brief Get last worker's retry busy time (in microseconds).
193      *
194      * @return Busy wait time in microseconds.
195      */
busyjob_partner_attr196     inline uint64_t busy() const
197     {
198         return this->busy_us_;
199     }
200 
201     /**
202      * @brief Get the depth of job queue.
203      *
204      * @return Queue depth.
205      */
queue_depthjob_partner_attr206     inline uint64_t queue_depth() const
207     {
208         return this->queue_depth_;
209     }
210 
211 private:
212     int qos_ = ffrt::qos_user_initiated;             ///< QoS level for the job partner.
213     uint64_t max_num_ = default_partner_max;         ///< Maximum number of partner workers.
214     uint64_t ratio_ = default_partner_ratio;         ///< Ratio for scaling the number of workers.
215     uint64_t threshold_ = default_partner_threshold; ///< Threshold for scaling the number of workers.
216     uint64_t busy_us_ = default_partner_delay_us;    ///< Busy wait time (us) for the last worker before exit.
217     uint64_t queue_depth_ = default_q_depth;         ///< Depth of the job queue.
218 
219     static constexpr uint64_t default_partner_max = 2;        ///< Default max number of partner workers.
220     static constexpr uint64_t default_partner_ratio = 20;     ///< Default ratio for worker scaling.
221     static constexpr uint64_t default_partner_threshold = 0;  ///< Default threshold for worker scaling.
222     static constexpr uint64_t default_partner_delay_us = 100; ///< Default busy wait time (us) for last worker.
223     static constexpr uint64_t default_q_depth = 1024;         ///< Default depth of the job queue.
224 };
225 
226 /**
227  * @struct job_partner
228  * @brief Provide the function of submitting tasks and waiting for task completion.
229  *
230  * @tparam UsageId The user-defined job type.
231  * @since 20
232  */
233 template <int UsageId = 0>
234 struct job_partner : ref_obj<job_partner<UsageId>>, detail::non_copyable {
235     /**
236     * @brief Retrieves the job_partner instance in the current thread.
237     *
238     * @param attr Job partner attributes.
239     * @return Reference to the job_partner instance.
240     * @since 20
241     */
242     static __attribute__((noinline)) auto& get_partner_of_this_thread(const job_partner_attr& attr = {})
243     {
244         static thread_local auto s = ref_obj<job_partner<UsageId>>::make(attr);
245         return s;
246     }
247 
248     /**
249      * @brief Submits a suspendable job to the partner thread (blocking).
250      *
251      * This function submits a job that can be suspended and resumed, using the specified stack and stack size.
252      * The function is blocking: it will block the current thread until the job is successfully executed.
253      * It can be called from both master and non-master threads. If the queue is full, it will retry until successful.
254      *
255      * @tparam boost Indicates whether to dynamically add workers.
256      * @param suspendable_job The job executor function closure.
257      * @param stack Pointer to the stack memory for the job.
258      * @param stack_size Size of the stack memory.
259      * @return Returns 1 if job initialization failed (e.g., invalid stack_size); 0 if submission succeeded.
260      * @since 20
261      */
262     template <bool boost = true>
submitjob_partner263     inline int submit(std::function<void()>&& suspendable_job, void* stack, size_t stack_size)
264     {
265         auto p = job_t::init(std::forward<std::function<void()>>(suspendable_job), stack, stack_size);
266         if (p == nullptr) {
267             FFRT_API_LOGE("job initialize failed, maybe invalid stack_size");
268             return 1;
269         }
270         FFRT_API_LOGD("submit %lu", p->id);
271         p->local().partner = this;
272         submit<boost>(suspendable_job_func, p);
273         return 0;
274     }
275 
276     /**
277      * @brief Submits a non-suspendable job to the partner thread (non-blocking).
278      *
279      * This function submits a job that cannot be suspended. The function is non-blocking:
280      * it will not block the current thread, and the job will be asynchronously executed by a partner worker thread.
281      * If the queue is full, it will retry until successful.
282      *
283      * @tparam boost Indicates whether to dynamically add workers.
284      * @param non_suspendable_job The job executor function closure.
285      * @since 20
286      */
287     template <bool boost = true>
submitjob_partner288     inline void submit(std::function<void()>&& non_suspendable_job)
289     {
290         auto p = new non_suspendable_job_t(std::forward<std::function<void()>>(non_suspendable_job), this);
291         FFRT_API_LOGD("non-suspendable job submit: %p", p);
292         submit<boost>(non_suspendable_job_func, p);
293     }
294 
295     /**
296      * @brief Submits a job to the master thread and suspends the current task until completion.
297      *
298      * This function submits a job to the master thread. The current task will be paused after submitting the closure,
299      * and will resume only after the master thread finishes executing the closure. If called outside a job context,
300      * the closure will be executed directly. If the queue is full, it will retry until successful.
301      *
302      * @param job The job executor function closure.
303      * @since 20
304      */
submit_to_masterjob_partner305     static inline void submit_to_master(std::function<void()>&& job)
306     {
307         auto& e = job_t::env();
308         auto j = e.cur;
309         if (j == nullptr || j->local().partner == e.tl.token) {
310             return job();
311         }
312         j->local().partner->submit_to_master(e, j, std::forward<std::function<void()>>(job));
313     }
314 
315     /**
316      * @brief Waits until all submitted tasks are complete.
317      *
318      * This function blocks the calling thread until all submitted jobs have finished execution.
319      * It can only be called from the master thread; calling from a non-master thread will fail.
320      *
321      * @tparam help_partner If true, the current thread will also consume jobs from the worker queue.
322      * @tparam busy_wait_us If the worker queue is empty, the current thread will busy-wait for
323      *                      this duration (in microseconds) before sleeping.
324      *                      If a job is submitted during this time, the thread will consume it.
325      * @return Returns 1 if called from a non-master thread (wait fails); 0 if wait succeeds.
326      * @since 20
327      */
328     template<bool help_partner = true, uint64_t busy_wait_us = 100>
waitjob_partner329     int wait()
330     {
331         if (!this_thread_is_master()) {
332             FFRT_API_LOGE("wait only can be called on master thread");
333             return 1;
334         }
335         FFRT_API_TRACE_SCOPE("%s wait on master", name.c_str());
336         FFRT_API_LOGD("wait on master");
337 
338         for (;;) {
339 _begin_consume_master_job:
340             int idx = 0;
341             while (master_q.try_run()) {
342                 if (((++idx & 0xF) == 0) && partner_num.load(std::memory_order_relaxed) == 0) {
343                     job_partner_task();
344                 }
345             }
346 
347             auto concurrency = job_num.load();
348             auto wn = partner_num.load(std::memory_order_relaxed);
349             if (wn < attr.max_num() && partner_q.size() > wn * attr.ratio() + attr.threshold()) {
350                 job_partner_task();
351             }
352             if (help_partner && partner_q.try_run()) {
353                 goto _begin_consume_master_job;
354             }
355             if (concurrency == 0) {
356                 break;
357             } else {
358                 auto s = clock::now();
359                 while (!help_partner && busy_wait_us > 0 && clock::ns(s) < busy_wait_us * 1000) {
360                     if (master_q.try_run()) {
361                         goto _begin_consume_master_job;
362                     }
363                     wfe();
364                 }
365                 master_wait.wait(0);
366                 master_wait = 0;
367             }
368         }
369 
370         FFRT_API_LOGD("wait success");
371         return 0;
372     }
373 
374     /**
375      * @brief Judge whether the current thread is the job_partner master.
376      *
377      * @return true if the current thread is the job_partner master; false otherwise.
378      * @since 20
379      */
this_thread_is_masterjob_partner380     inline bool this_thread_is_master()
381     {
382         return job_t::env().tl.token == this;
383     }
384 
385 private:
386     friend ref_obj<job_partner>; ///< Allows ref_obj to access private members for reference counting.
387 
388     /**
389      * @brief Fiber-local storage structure for master function and partner pointer.
390      */
391     struct fls {
392         std::function<void()> master_f; ///< Function to be executed by the master.
393         job_partner* partner;           ///< Pointer to the associated job_partner instance.
394     };
395 
396     /**
397      * @brief Thread-local storage structure for token identification.
398      */
399     struct tls {
400         void* token = nullptr; ///< Token used to identify the current job_partner instance.
401     };
402 
403     /**
404      * @brief Alias for the fiber type used by this job_partner.
405      */
406     using job_t = fiber<UsageId, fls, tls>;
407 
408     /**
409      * @brief Structure representing a non-suspendable job.
410      */
411     struct non_suspendable_job_t {
412         /**
413          * @brief Constructs a non_suspendable_job_t object.
414          *
415          * @param fn Function to execute.
416          * @param p Pointer to the associated job_partner.
417          */
non_suspendable_job_tjob_partner::non_suspendable_job_t418         non_suspendable_job_t(std::function<void()>&& fn, job_partner* p)
419             : fn(std::forward<std::function<void()>>(fn)), partner(p) {}
420 
421         std::function<void()> fn; ///< Function to execute.
422         job_partner* partner;     ///< Pointer to the associated job_partner.
423     };
424 
425     /**
426      * @brief Constructs a job_partner object with the given attributes.
427      *
428      * @param attr Job partner attributes.
429      */
430     job_partner(const job_partner_attr& attr = {})
431         : name("partner<" + std::to_string(UsageId) + ">" + std::to_string(syscall(SYS_gettid))),
432         attr(attr), partner_q(attr.queue_depth(), name + "_pq"), master_q(attr.queue_depth(), name + "_mq")
433     {
434         concurrency_name = name + "_cc#";
435         partner_num_name = name + "_p#";
436         job_t::env().tl.token = this;
437     }
438 
439     /**
440      * @brief Submits a job to the partner queue.
441      *
442      * @tparam boost Indicates whether to dynamically add workers.
443      * @param f Function pointer for the job.
444      * @param p Pointer to the job data.
445      */
446     template <bool boost>
submitjob_partner447     void submit(func_ptr f, void* p)
448     {
449         auto concurrency = job_num.fetch_add(1, std::memory_order_relaxed) + 1;
450         (void)(concurrency);
451         FFRT_API_TRACE_INT64(concurrency_name.c_str(), concurrency);
452         partner_q.template push<1>(f, p);
453 
454         auto wn = partner_num.load(std::memory_order_relaxed);
455         if (boost || attr.threshold()) {
456             if (wn < attr.max_num() && partner_q.size() > wn * attr.ratio() + attr.threshold()) {
457                 job_partner_task();
458             }
459         } else {
460             if (wn == 0) {
461                 job_partner_task();
462             }
463         }
464     }
465 
466     /**
467      * @brief Submits a job to the master queue and suspends the current fiber.
468      *
469      * @tparam Env Environment type.
470      * @param e Reference to the environment.
471      * @param p Pointer to the job fiber.
472      * @param job Function to execute.
473      */
474     template<class Env>
submit_to_masterjob_partner475     void submit_to_master(Env& e, job_t* p, std::function<void()>&& job)
476     {
477         FFRT_API_LOGD("job %lu submit to master", (p ? p->id : -1UL));
478         p->local().master_f = std::forward<std::function<void()>>(job);
479         p->suspend(e, submit_to_master_suspend_func);
480     }
481 
482     /**
483      * @brief Suspend function used when submitting to master.
484      *
485      * @param p Pointer to the job fiber.
486      * @return True if suspension is successful.
487      */
submit_to_master_suspend_funcjob_partner488     static bool submit_to_master_suspend_func(void* p)
489     {
490         auto partner = (static_cast<job_t*>(p))->local().partner;
491         partner->master_q.template push<0>(master_run_func, p);
492         partner->notify_master();
493         return true;
494     }
495 
496     /**
497      * @brief Notifies the master thread to wake up if waiting.
498      */
notify_masterjob_partner499     inline void notify_master()
500     {
501         if (master_wait.exchange(1) == 0) {
502             master_wait.notify_one();
503         }
504     }
505 
506     /**
507      * @brief Launches a new partner worker task.
508      */
job_partner_taskjob_partner509     void job_partner_task()
510     {
511         auto partner_n = partner_num.fetch_add(1) + 1;
512         (void)(partner_n);
513         FFRT_API_TRACE_INT64(partner_num_name.c_str(), partner_n);
514         FFRT_API_TRACE_SCOPE("%s add task", name.c_str());
515 
516         ref_obj<job_partner<UsageId>>::inc_ref();
517         ffrt::submit([this] {
518             auto wn = partner_num.load(std::memory_order_relaxed);
519             if (wn < attr.max_num() && partner_q.size() > wn * attr.ratio() + attr.threshold()) {
520                 job_partner_task();
521             }
522 _re_run_partner:
523             while (partner_q.try_run());
524             if (partner_num.load() == 1 && attr.busy() > 0) { // last partner delay
525                 FFRT_API_TRACE_SCOPE("stall");
526                 auto s = clock::now();
527                 while (clock::ns(s) < attr.busy() * 1000) {
528                     if (partner_q.try_run()) {
529                         goto _re_run_partner;
530                     }
531                     wfe();
532                 }
533             }
534             auto partner_n = partner_num.fetch_sub(1) - 1;
535             (void)(partner_n);
536             FFRT_API_TRACE_INT64(partner_num_name.c_str(), partner_n);
537             if (partner_q.try_run()) {
538                 auto partner_n = partner_num.fetch_add(1) + 1;
539                 (void)(partner_n);
540                 FFRT_API_TRACE_INT64(partner_num_name.c_str(), partner_n);
541                 goto _re_run_partner;
542             }
543             ref_obj<job_partner<UsageId>>::dec_ref();
544             }, {}, {}, task_attr().qos(attr.qos()).name(name.c_str()));
545     }
546 
547     /**
548      * @brief Function executed by a suspendable job.
549      *
550      * @param p_ Pointer to the job fiber.
551      */
suspendable_job_funcjob_partner552     static void suspendable_job_func(void* p_)
553     {
554         auto p = (static_cast<job_t*>(p_));
555         FFRT_API_LOGD("run partner job %lu", p->id);
556         FFRT_API_TRACE_SCOPE("pjob%lu", p->id);
557         if (p->start()) { // job done
558             auto partner = p->local().partner;
559             auto concurrency = partner->job_num.fetch_sub(1, std::memory_order_acquire) - 1;
560             FFRT_API_TRACE_INT64(partner->concurrency_name.c_str(), concurrency);
561             if (concurrency == 0) {
562                 partner->notify_master();
563             }
564             p->destroy();
565         }
566     }
567 
568     /**
569      * @brief Function executed by a non-suspendable job.
570      *
571      * @param p_ Pointer to the non_suspendable_job_t object.
572      */
non_suspendable_job_funcjob_partner573     static void non_suspendable_job_func(void* p_)
574     {
575         auto p = static_cast<non_suspendable_job_t*>(p_);
576         FFRT_API_LOGD("run non-suspendable job %p", p);
577         FFRT_API_TRACE_SCOPE("nsjob");
578         (p->fn)();
579         auto partner = p->partner;
580         auto concurrency = partner->job_num.fetch_sub(1, std::memory_order_acquire) - 1;
581         FFRT_API_TRACE_INT64(partner->concurrency_name.c_str(), concurrency);
582         if (concurrency == 0) {
583             partner->notify_master();
584         }
585         delete p;
586     }
587 
588     /**
589      * @brief Function executed by the master to run a job.
590      *
591      * @param p_ Pointer to the job fiber.
592      */
master_run_funcjob_partner593     static void master_run_func(void* p_)
594     {
595         auto p = static_cast<job_t*>(p_);
596         {
597             FFRT_API_LOGD("run master job %lu", p->id);
598             FFRT_API_TRACE_SCOPE("mjob%lu", p->id);
599             p->local().master_f();
600             p->local().master_f = nullptr;
601         }
602         p->local().partner->partner_q.template push<1>(suspendable_job_func, p);
603     }
604 
605     std::string name;             ///< Name of the job_partner instance.
606     std::string concurrency_name; ///< Name used for concurrency tracing.
607     std::string partner_num_name; ///< Name used for partner number tracing.
608     job_partner_attr attr;        ///< Attributes for configuring the job_partner.
609 
610     alignas(detail::cacheline_size) std::atomic_uint64_t partner_num{0}; ///< Number of active partner workers.
611     alignas(detail::cacheline_size) std::atomic_uint64_t job_num{0};     ///< Number of active jobs.
612 
613     runnable_queue<mpmc_queue> partner_q; ///< Runnable queue for partner jobs.
614     runnable_queue<mpmc_queue> master_q;  ///< Runnable queue for master jobs.
615     atomic_wait master_wait = 0;          ///< Synchronization primitive for master waiting.
616 };
617 
618 } // namespace ffrt
619 
620 #endif // FFRT_JOB_PARTNER_H
621 /** @} */