1 /* 2 * Copyright (c) 2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 /** 17 * @addtogroup FFRT 18 * @{ 19 * 20 * @brief Provides FFRT C++ APIs. 21 * 22 * @since 20 23 */ 24 25 /** 26 * @file job_partner.h 27 * 28 * @brief Declares the job partner interfaces in C++. 29 * 30 * @library libffrt.z.so 31 * @kit FunctionFlowRuntimeKit 32 * @syscap SystemCapability.Resourceschedule.Ffrt.Core 33 * @since 20 34 */ 35 36 #ifndef FFRT_JOB_PARTNER_H 37 #define FFRT_JOB_PARTNER_H 38 39 #include <functional> 40 #include <string> 41 #include <atomic> 42 #include "job_utils.h" 43 #include "cpp/task.h" 44 45 namespace ffrt { 46 47 /** 48 * @struct job_partner_attr 49 * @brief Defines the job partner attribute structure for controlling worker concurrency. 50 * 51 * This structure provides initialization and configuration for job partner attributes, 52 * including QoS, maximum worker number, ratio, threshold, busy wait time, and queue depth. 53 * 54 * The relationship between job number and partner number is illustrated as follows: 55 * @verbatim 56 * partner_num 57 * ^ 58 * | 59 * max | ------------------ 60 * | / 61 * | ratio / 62 * | / 63 * | / 64 * | / 65 * | / 66 * +------------------------------> job_num 67 * threshold 68 * @endverbatim 69 * 70 * - The vertical axis is partner_num, and the horizontal axis is job_num. 71 * - Threshold: When job_num is less than threshold, partner_num is 0. 72 * - Ratio control: When job_num is between threshold and "max * ratio + threshold", 73 * partner_num is calculated as "round((job_num - threshold) / ratio)". 74 * - Maximum value: When job_num is greater than "max * ratio + threshold", partner_num is the maximum value. 75 * 76 * @since 20 77 */ 78 struct job_partner_attr { 79 /** 80 * @brief Set QoS level. 81 * 82 * @param q QoS value. 83 * @return Reference to this attribute object. 84 */ qosjob_partner_attr85 inline job_partner_attr& qos(qos q) 86 { 87 this->qos_ = q; 88 return *this; 89 } 90 91 /** 92 * @brief Set max number of partner workers. 93 * 94 * @param v Maximum number of workers. 95 * @return Reference to this attribute object. 96 */ max_numjob_partner_attr97 inline job_partner_attr& max_num(uint64_t v) 98 { 99 this->max_num_ = v; 100 return *this; 101 } 102 103 /** 104 * @brief Set the ratio parameter for controlling the number of workers. 105 * 106 * @param v Ratio value. 107 * @return Reference to this attribute object. 108 */ ratiojob_partner_attr109 inline job_partner_attr& ratio(uint64_t v) 110 { 111 this->ratio_ = v; 112 return *this; 113 } 114 115 /** 116 * @brief Set the threshold parameter for controlling the number of workers. 117 * 118 * @param v Threshold value. 119 * @return Reference to this attribute object. 120 */ thresholdjob_partner_attr121 inline job_partner_attr& threshold(uint64_t v) 122 { 123 this->threshold_ = v; 124 return *this; 125 } 126 127 /** 128 * @brief Set last worker's retry busy time (in microseconds). 129 * 130 * @param us Busy wait time in microseconds. 131 * @return Reference to this attribute object. 132 */ busyjob_partner_attr133 inline job_partner_attr& busy(uint64_t us) 134 { 135 this->busy_us_ = us; 136 return *this; 137 } 138 139 /** 140 * @brief Set the depth of job queue. 141 * 142 * @param depth Queue depth. 143 * @return Reference to this attribute object. 144 */ queue_depthjob_partner_attr145 inline job_partner_attr& queue_depth(uint64_t depth) 146 { 147 this->queue_depth_ = depth; 148 return *this; 149 } 150 151 /** 152 * @brief Get QoS level. 153 * 154 * @return QoS value. 155 */ qosjob_partner_attr156 inline int qos() const 157 { 158 return this->qos_; 159 } 160 161 /** 162 * @brief Get max number of partner workers. 163 * 164 * @return Maximum number of workers. 165 */ max_numjob_partner_attr166 inline uint64_t max_num() const 167 { 168 return this->max_num_; 169 } 170 171 /** 172 * @brief Get the ratio parameter for controlling the number of workers. 173 * 174 * @return Ratio value. 175 */ ratiojob_partner_attr176 inline uint64_t ratio() const 177 { 178 return this->ratio_; 179 } 180 181 /** 182 * @brief Get the threshold parameter for controlling the number of workers. 183 * 184 * @return Threshold value. 185 */ thresholdjob_partner_attr186 inline uint64_t threshold() const 187 { 188 return this->threshold_; 189 } 190 191 /** 192 * @brief Get last worker's retry busy time (in microseconds). 193 * 194 * @return Busy wait time in microseconds. 195 */ busyjob_partner_attr196 inline uint64_t busy() const 197 { 198 return this->busy_us_; 199 } 200 201 /** 202 * @brief Get the depth of job queue. 203 * 204 * @return Queue depth. 205 */ queue_depthjob_partner_attr206 inline uint64_t queue_depth() const 207 { 208 return this->queue_depth_; 209 } 210 211 private: 212 int qos_ = ffrt::qos_user_initiated; ///< QoS level for the job partner. 213 uint64_t max_num_ = default_partner_max; ///< Maximum number of partner workers. 214 uint64_t ratio_ = default_partner_ratio; ///< Ratio for scaling the number of workers. 215 uint64_t threshold_ = default_partner_threshold; ///< Threshold for scaling the number of workers. 216 uint64_t busy_us_ = default_partner_delay_us; ///< Busy wait time (us) for the last worker before exit. 217 uint64_t queue_depth_ = default_q_depth; ///< Depth of the job queue. 218 219 static constexpr uint64_t default_partner_max = 2; ///< Default max number of partner workers. 220 static constexpr uint64_t default_partner_ratio = 20; ///< Default ratio for worker scaling. 221 static constexpr uint64_t default_partner_threshold = 0; ///< Default threshold for worker scaling. 222 static constexpr uint64_t default_partner_delay_us = 100; ///< Default busy wait time (us) for last worker. 223 static constexpr uint64_t default_q_depth = 1024; ///< Default depth of the job queue. 224 }; 225 226 /** 227 * @struct job_partner 228 * @brief Provide the function of submitting tasks and waiting for task completion. 229 * 230 * @tparam UsageId The user-defined job type. 231 * @since 20 232 */ 233 template <int UsageId = 0> 234 struct job_partner : ref_obj<job_partner<UsageId>>, detail::non_copyable { 235 /** 236 * @brief Retrieves the job_partner instance in the current thread. 237 * 238 * @param attr Job partner attributes. 239 * @return Reference to the job_partner instance. 240 * @since 20 241 */ 242 static __attribute__((noinline)) auto& get_partner_of_this_thread(const job_partner_attr& attr = {}) 243 { 244 static thread_local auto s = ref_obj<job_partner<UsageId>>::make(attr); 245 return s; 246 } 247 248 /** 249 * @brief Submits a suspendable job to the partner thread (blocking). 250 * 251 * This function submits a job that can be suspended and resumed, using the specified stack and stack size. 252 * The function is blocking: it will block the current thread until the job is successfully executed. 253 * It can be called from both master and non-master threads. If the queue is full, it will retry until successful. 254 * 255 * @tparam boost Indicates whether to dynamically add workers. 256 * @param suspendable_job The job executor function closure. 257 * @param stack Pointer to the stack memory for the job. 258 * @param stack_size Size of the stack memory. 259 * @return Returns 1 if job initialization failed (e.g., invalid stack_size); 0 if submission succeeded. 260 * @since 20 261 */ 262 template <bool boost = true> submitjob_partner263 inline int submit(std::function<void()>&& suspendable_job, void* stack, size_t stack_size) 264 { 265 auto p = job_t::init(std::forward<std::function<void()>>(suspendable_job), stack, stack_size); 266 if (p == nullptr) { 267 FFRT_API_LOGE("job initialize failed, maybe invalid stack_size"); 268 return 1; 269 } 270 FFRT_API_LOGD("submit %lu", p->id); 271 p->local().partner = this; 272 submit<boost>(suspendable_job_func, p); 273 return 0; 274 } 275 276 /** 277 * @brief Submits a non-suspendable job to the partner thread (non-blocking). 278 * 279 * This function submits a job that cannot be suspended. The function is non-blocking: 280 * it will not block the current thread, and the job will be asynchronously executed by a partner worker thread. 281 * If the queue is full, it will retry until successful. 282 * 283 * @tparam boost Indicates whether to dynamically add workers. 284 * @param non_suspendable_job The job executor function closure. 285 * @since 20 286 */ 287 template <bool boost = true> submitjob_partner288 inline void submit(std::function<void()>&& non_suspendable_job) 289 { 290 auto p = new non_suspendable_job_t(std::forward<std::function<void()>>(non_suspendable_job), this); 291 FFRT_API_LOGD("non-suspendable job submit: %p", p); 292 submit<boost>(non_suspendable_job_func, p); 293 } 294 295 /** 296 * @brief Submits a job to the master thread and suspends the current task until completion. 297 * 298 * This function submits a job to the master thread. The current task will be paused after submitting the closure, 299 * and will resume only after the master thread finishes executing the closure. If called outside a job context, 300 * the closure will be executed directly. If the queue is full, it will retry until successful. 301 * 302 * @param job The job executor function closure. 303 * @since 20 304 */ submit_to_masterjob_partner305 static inline void submit_to_master(std::function<void()>&& job) 306 { 307 auto& e = job_t::env(); 308 auto j = e.cur; 309 if (j == nullptr || j->local().partner == e.tl.token) { 310 return job(); 311 } 312 j->local().partner->submit_to_master(e, j, std::forward<std::function<void()>>(job)); 313 } 314 315 /** 316 * @brief Waits until all submitted tasks are complete. 317 * 318 * This function blocks the calling thread until all submitted jobs have finished execution. 319 * It can only be called from the master thread; calling from a non-master thread will fail. 320 * 321 * @tparam help_partner If true, the current thread will also consume jobs from the worker queue. 322 * @tparam busy_wait_us If the worker queue is empty, the current thread will busy-wait for 323 * this duration (in microseconds) before sleeping. 324 * If a job is submitted during this time, the thread will consume it. 325 * @return Returns 1 if called from a non-master thread (wait fails); 0 if wait succeeds. 326 * @since 20 327 */ 328 template<bool help_partner = true, uint64_t busy_wait_us = 100> waitjob_partner329 int wait() 330 { 331 if (!this_thread_is_master()) { 332 FFRT_API_LOGE("wait only can be called on master thread"); 333 return 1; 334 } 335 FFRT_API_TRACE_SCOPE("%s wait on master", name.c_str()); 336 FFRT_API_LOGD("wait on master"); 337 338 for (;;) { 339 _begin_consume_master_job: 340 int idx = 0; 341 while (master_q.try_run()) { 342 if (((++idx & 0xF) == 0) && partner_num.load(std::memory_order_relaxed) == 0) { 343 job_partner_task(); 344 } 345 } 346 347 auto concurrency = job_num.load(); 348 auto wn = partner_num.load(std::memory_order_relaxed); 349 if (wn < attr.max_num() && partner_q.size() > wn * attr.ratio() + attr.threshold()) { 350 job_partner_task(); 351 } 352 if (help_partner && partner_q.try_run()) { 353 goto _begin_consume_master_job; 354 } 355 if (concurrency == 0) { 356 break; 357 } else { 358 auto s = clock::now(); 359 while (!help_partner && busy_wait_us > 0 && clock::ns(s) < busy_wait_us * 1000) { 360 if (master_q.try_run()) { 361 goto _begin_consume_master_job; 362 } 363 wfe(); 364 } 365 master_wait.wait(0); 366 master_wait = 0; 367 } 368 } 369 370 FFRT_API_LOGD("wait success"); 371 return 0; 372 } 373 374 /** 375 * @brief Judge whether the current thread is the job_partner master. 376 * 377 * @return true if the current thread is the job_partner master; false otherwise. 378 * @since 20 379 */ this_thread_is_masterjob_partner380 inline bool this_thread_is_master() 381 { 382 return job_t::env().tl.token == this; 383 } 384 385 private: 386 friend ref_obj<job_partner>; ///< Allows ref_obj to access private members for reference counting. 387 388 /** 389 * @brief Fiber-local storage structure for master function and partner pointer. 390 */ 391 struct fls { 392 std::function<void()> master_f; ///< Function to be executed by the master. 393 job_partner* partner; ///< Pointer to the associated job_partner instance. 394 }; 395 396 /** 397 * @brief Thread-local storage structure for token identification. 398 */ 399 struct tls { 400 void* token = nullptr; ///< Token used to identify the current job_partner instance. 401 }; 402 403 /** 404 * @brief Alias for the fiber type used by this job_partner. 405 */ 406 using job_t = fiber<UsageId, fls, tls>; 407 408 /** 409 * @brief Structure representing a non-suspendable job. 410 */ 411 struct non_suspendable_job_t { 412 /** 413 * @brief Constructs a non_suspendable_job_t object. 414 * 415 * @param fn Function to execute. 416 * @param p Pointer to the associated job_partner. 417 */ non_suspendable_job_tjob_partner::non_suspendable_job_t418 non_suspendable_job_t(std::function<void()>&& fn, job_partner* p) 419 : fn(std::forward<std::function<void()>>(fn)), partner(p) {} 420 421 std::function<void()> fn; ///< Function to execute. 422 job_partner* partner; ///< Pointer to the associated job_partner. 423 }; 424 425 /** 426 * @brief Constructs a job_partner object with the given attributes. 427 * 428 * @param attr Job partner attributes. 429 */ 430 job_partner(const job_partner_attr& attr = {}) 431 : name("partner<" + std::to_string(UsageId) + ">" + std::to_string(syscall(SYS_gettid))), 432 attr(attr), partner_q(attr.queue_depth(), name + "_pq"), master_q(attr.queue_depth(), name + "_mq") 433 { 434 concurrency_name = name + "_cc#"; 435 partner_num_name = name + "_p#"; 436 job_t::env().tl.token = this; 437 } 438 439 /** 440 * @brief Submits a job to the partner queue. 441 * 442 * @tparam boost Indicates whether to dynamically add workers. 443 * @param f Function pointer for the job. 444 * @param p Pointer to the job data. 445 */ 446 template <bool boost> submitjob_partner447 void submit(func_ptr f, void* p) 448 { 449 auto concurrency = job_num.fetch_add(1, std::memory_order_relaxed) + 1; 450 (void)(concurrency); 451 FFRT_API_TRACE_INT64(concurrency_name.c_str(), concurrency); 452 partner_q.template push<1>(f, p); 453 454 auto wn = partner_num.load(std::memory_order_relaxed); 455 if (boost || attr.threshold()) { 456 if (wn < attr.max_num() && partner_q.size() > wn * attr.ratio() + attr.threshold()) { 457 job_partner_task(); 458 } 459 } else { 460 if (wn == 0) { 461 job_partner_task(); 462 } 463 } 464 } 465 466 /** 467 * @brief Submits a job to the master queue and suspends the current fiber. 468 * 469 * @tparam Env Environment type. 470 * @param e Reference to the environment. 471 * @param p Pointer to the job fiber. 472 * @param job Function to execute. 473 */ 474 template<class Env> submit_to_masterjob_partner475 void submit_to_master(Env& e, job_t* p, std::function<void()>&& job) 476 { 477 FFRT_API_LOGD("job %lu submit to master", (p ? p->id : -1UL)); 478 p->local().master_f = std::forward<std::function<void()>>(job); 479 p->suspend(e, submit_to_master_suspend_func); 480 } 481 482 /** 483 * @brief Suspend function used when submitting to master. 484 * 485 * @param p Pointer to the job fiber. 486 * @return True if suspension is successful. 487 */ submit_to_master_suspend_funcjob_partner488 static bool submit_to_master_suspend_func(void* p) 489 { 490 auto partner = (static_cast<job_t*>(p))->local().partner; 491 partner->master_q.template push<0>(master_run_func, p); 492 partner->notify_master(); 493 return true; 494 } 495 496 /** 497 * @brief Notifies the master thread to wake up if waiting. 498 */ notify_masterjob_partner499 inline void notify_master() 500 { 501 if (master_wait.exchange(1) == 0) { 502 master_wait.notify_one(); 503 } 504 } 505 506 /** 507 * @brief Launches a new partner worker task. 508 */ job_partner_taskjob_partner509 void job_partner_task() 510 { 511 auto partner_n = partner_num.fetch_add(1) + 1; 512 (void)(partner_n); 513 FFRT_API_TRACE_INT64(partner_num_name.c_str(), partner_n); 514 FFRT_API_TRACE_SCOPE("%s add task", name.c_str()); 515 516 ref_obj<job_partner<UsageId>>::inc_ref(); 517 ffrt::submit([this] { 518 auto wn = partner_num.load(std::memory_order_relaxed); 519 if (wn < attr.max_num() && partner_q.size() > wn * attr.ratio() + attr.threshold()) { 520 job_partner_task(); 521 } 522 _re_run_partner: 523 while (partner_q.try_run()); 524 if (partner_num.load() == 1 && attr.busy() > 0) { // last partner delay 525 FFRT_API_TRACE_SCOPE("stall"); 526 auto s = clock::now(); 527 while (clock::ns(s) < attr.busy() * 1000) { 528 if (partner_q.try_run()) { 529 goto _re_run_partner; 530 } 531 wfe(); 532 } 533 } 534 auto partner_n = partner_num.fetch_sub(1) - 1; 535 (void)(partner_n); 536 FFRT_API_TRACE_INT64(partner_num_name.c_str(), partner_n); 537 if (partner_q.try_run()) { 538 auto partner_n = partner_num.fetch_add(1) + 1; 539 (void)(partner_n); 540 FFRT_API_TRACE_INT64(partner_num_name.c_str(), partner_n); 541 goto _re_run_partner; 542 } 543 ref_obj<job_partner<UsageId>>::dec_ref(); 544 }, {}, {}, task_attr().qos(attr.qos()).name(name.c_str())); 545 } 546 547 /** 548 * @brief Function executed by a suspendable job. 549 * 550 * @param p_ Pointer to the job fiber. 551 */ suspendable_job_funcjob_partner552 static void suspendable_job_func(void* p_) 553 { 554 auto p = (static_cast<job_t*>(p_)); 555 FFRT_API_LOGD("run partner job %lu", p->id); 556 FFRT_API_TRACE_SCOPE("pjob%lu", p->id); 557 if (p->start()) { // job done 558 auto partner = p->local().partner; 559 auto concurrency = partner->job_num.fetch_sub(1, std::memory_order_acquire) - 1; 560 FFRT_API_TRACE_INT64(partner->concurrency_name.c_str(), concurrency); 561 if (concurrency == 0) { 562 partner->notify_master(); 563 } 564 p->destroy(); 565 } 566 } 567 568 /** 569 * @brief Function executed by a non-suspendable job. 570 * 571 * @param p_ Pointer to the non_suspendable_job_t object. 572 */ non_suspendable_job_funcjob_partner573 static void non_suspendable_job_func(void* p_) 574 { 575 auto p = static_cast<non_suspendable_job_t*>(p_); 576 FFRT_API_LOGD("run non-suspendable job %p", p); 577 FFRT_API_TRACE_SCOPE("nsjob"); 578 (p->fn)(); 579 auto partner = p->partner; 580 auto concurrency = partner->job_num.fetch_sub(1, std::memory_order_acquire) - 1; 581 FFRT_API_TRACE_INT64(partner->concurrency_name.c_str(), concurrency); 582 if (concurrency == 0) { 583 partner->notify_master(); 584 } 585 delete p; 586 } 587 588 /** 589 * @brief Function executed by the master to run a job. 590 * 591 * @param p_ Pointer to the job fiber. 592 */ master_run_funcjob_partner593 static void master_run_func(void* p_) 594 { 595 auto p = static_cast<job_t*>(p_); 596 { 597 FFRT_API_LOGD("run master job %lu", p->id); 598 FFRT_API_TRACE_SCOPE("mjob%lu", p->id); 599 p->local().master_f(); 600 p->local().master_f = nullptr; 601 } 602 p->local().partner->partner_q.template push<1>(suspendable_job_func, p); 603 } 604 605 std::string name; ///< Name of the job_partner instance. 606 std::string concurrency_name; ///< Name used for concurrency tracing. 607 std::string partner_num_name; ///< Name used for partner number tracing. 608 job_partner_attr attr; ///< Attributes for configuring the job_partner. 609 610 alignas(detail::cacheline_size) std::atomic_uint64_t partner_num{0}; ///< Number of active partner workers. 611 alignas(detail::cacheline_size) std::atomic_uint64_t job_num{0}; ///< Number of active jobs. 612 613 runnable_queue<mpmc_queue> partner_q; ///< Runnable queue for partner jobs. 614 runnable_queue<mpmc_queue> master_q; ///< Runnable queue for master jobs. 615 atomic_wait master_wait = 0; ///< Synchronization primitive for master waiting. 616 }; 617 618 } // namespace ffrt 619 620 #endif // FFRT_JOB_PARTNER_H 621 /** @} */