1# Function Flow Runtime并发队列(C) 2 3## 概述 4 5FFRT并发队列提供了设置任务优先级(Priority)和队列并发度的能力,使得队列中的任务能同时在多个线程上执行,获得更高的并行效果。 6 7- **队列并发度**:通过队列最大并发度设置,可以控制同一时刻同时执行的任务数量。这有助于避免任务并发过多对系统资源造成冲击,从而保证系统的稳定性和性能。 8- **任务优先级**:用户可以为每个任务设置优先级,不同的任务将严格按照优先级进行调度和执行。相同优先级的任务按照排队顺序执行,高优先级的任务将优先于低优先级的任务执行,确保关键任务能够及时处理。 9 10## 示例:银行服务系统 11 12举例实现一个银行服务系统,每个客户向系统提交一个服务请求,可以区分普通用户和VIP用户,VIP用户的服务请求可以优先得到执行。 13银行系统中有2个窗口,可以并行取出用户提交的服务请求办理。可以利用FFRT的并行队列范式做如下建模: 14 15- **排队逻辑**:并行队列。 16- **服务窗口**:并行队列的并发度,同时也对应FFRT Worker数量。 17- **用户等级**:并行队列任务优先级。 18 19实现代码如下所示: 20 21```c 22#include <stdio.h> 23#include <string.h> 24#include <ffrt.h> 25 26ffrt_queue_t create_bank_system(const char *name, int concurrency) 27{ 28 ffrt_queue_attr_t queue_attr; 29 (void)ffrt_queue_attr_init(&queue_attr); 30 ffrt_queue_attr_set_max_concurrency(&queue_attr, 4); 31 32 // 创建一个并发队列 33 ffrt_queue_t queue = ffrt_queue_create(ffrt_queue_concurrent, name, &queue_attr); 34 35 // 队列创建完后需要销毁队列属性 36 ffrt_queue_attr_destroy(&queue_attr); 37 if (!queue) { 38 printf("create queue failed\n"); 39 return NULL; 40 } 41 42 printf("create bank system successful\n"); 43 return queue; 44} 45 46void destory_bank_system(ffrt_queue_t queue_handle) 47{ 48 ffrt_queue_destroy(queue_handle); 49 printf("destory bank system successful\n"); 50} 51 52void bank_business(void *arg) 53{ 54 usleep(100 * 1000); 55 const char *data = (const char *)arg; 56 printf("saving or withdraw for %s\n", data); 57} 58 59// 封装提交队列任务函数 60ffrt_task_handle_t commit_request(ffrt_queue_t bank, void (*func)(void *), char *name, ffrt_queue_priority_t level, int delay) 61{ 62 ffrt_task_attr_t task_attr; 63 (void)ffrt_task_attr_init(&task_attr); 64 ffrt_task_attr_set_name(&task_attr, name); 65 ffrt_task_attr_set_queue_priority(&task_attr, level); 66 ffrt_task_attr_set_delay(&task_attr, delay); 67 68 return ffrt_queue_submit_h(bank, ffrt_create_function_wrapper(func, NULL, name), &task_attr); 69} 70 71// 封装取消队列任务函数 72int cancel_request(ffrt_task_handle_t request) 73{ 74 return ffrt_queue_cancel(request); 75} 76 77int get_bank_queue_size(ffrt_queue_t bank) 78{ 79 return ffrt_queue_get_task_cnt(bank); 80} 81 82// 封装等待队列任务函数 83void wait_for_request(ffrt_task_handle_t task) 84{ 85 ffrt_queue_wait(task); 86} 87 88int main() 89{ 90 ffrt_queue_t bank = create_bank_system("Bank", 2); 91 if (!bank) { 92 printf("create bank system failed"); 93 return -1; 94 } 95 commit_request(bank, bank_business, "customer1", ffrt_queue_priority_low, 0); 96 commit_request(bank, bank_business, "customer2", ffrt_queue_priority_low, 0); 97 commit_request(bank, bank_business, "customer3", ffrt_queue_priority_low, 0); 98 commit_request(bank, bank_business, "customer4", ffrt_queue_priority_low, 0); 99 100 // VIP享受更优先的服务 101 commit_request(bank, bank_business, "VIP", ffrt_queue_priority_high, 0); 102 103 ffrt_task_handle_t task = commit_request(bank, bank_business, "customer5", ffrt_queue_priority_low, 0); 104 ffrt_task_handle_t task_last = commit_request(bank, bank_business, "customer6", ffrt_queue_priority_low, 0); 105 106 // 取消客户5的服务 107 cancel_request(task); 108 109 printf("bank current serving for %d customers\n", get_bank_queue_size(bank)); 110 111 // 等待所有的客户服务完成 112 wait_for_request(task_last); 113 destory_bank_system(bank); 114 115 return 0; 116} 117``` 118 119C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。 120 121```c 122typedef struct { 123 ffrt_function_header_t header; 124 ffrt_function_t func; 125 ffrt_function_t after_func; 126 void* arg; 127} c_function_t; 128 129static inline void ffrt_exec_function_wrapper(void* t) 130{ 131 c_function_t* f = (c_function_t *)t; 132 if (f->func) { 133 f->func(f->arg); 134 } 135} 136 137static inline void ffrt_destroy_function_wrapper(void* t) 138{ 139 c_function_t* f = (c_function_t *)t; 140 if (f->after_func) { 141 f->after_func(f->arg); 142 } 143} 144 145#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1]) 146static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func, 147 const ffrt_function_t after_func, void *arg) 148{ 149 FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size, 150 size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size); 151 152 c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_queue); 153 f->header.exec = ffrt_exec_function_wrapper; 154 f->header.destroy = ffrt_destroy_function_wrapper; 155 f->func = func; 156 f->after_func = after_func; 157 f->arg = arg; 158 return (ffrt_function_header_t *)f; 159} 160``` 161 162## 接口说明 163 164上述样例中涉及到主要的FFRT的接口包括: 165 166| 名称 | 描述 | 167| -------------------------------------------------------------------------------------------------- | ---------------------- | 168| [ffrt_queue_create](ffrt-api-guideline-c.md#ffrt_queue_create) | 创建队列。 | 169| [ffrt_queue_destroy](ffrt-api-guideline-c.md#ffrt_queue_destroy) | 销毁队列。 | 170| [ffrt_task_attr_set_queue_priority](ffrt-api-guideline-c.md#ffrt_task_attr_set_queue_priority) | 设置队列任务优先级。 | 171| [ffrt_queue_attr_set_max_concurrency](ffrt-api-guideline-c.md#ffrt_queue_attr_set_max_concurrency) | 设置并发队列的并发度。 | 172 173## 约束限制 174 1751. `ffrt_queue_attr_t`必须先调用`ffrt_queue_attr_init`初始化后再设置/获取属性,不再使用后需要显式调用`ffrt_queue_attr_destroy`释放资源。 1762. `ffrt_queue_t`必须在进程退出前显式调用`ffrt_queue_destroy`释放资源。 1773. 并发队列最大并发度建议控制在合理范围内,配置过大超过Worker线程数没有意义,配置过小可能导致系统资源利用率不足。 178