1# Function Flow Runtime并发队列(C) 2 3## 概述 4 5FFRT并发队列提供了设置任务优先级(Priority)和队列并发度的能力,使得队列中的任务能同时在多个线程上执行,获得更高的并行效果。 6 7- **队列并发度**:通过队列最大并发度设置,可以控制同一时刻同时执行的任务数量。这有助于避免任务并发过多对系统资源造成冲击,从而保证系统的稳定性和性能。 8- **任务优先级**:用户可以为每个任务设置优先级,不同的任务将严格按照优先级进行调度和执行。相同优先级的任务按照排队顺序执行,高优先级的任务将优先于低优先级的任务执行,确保关键任务能够及时处理。 9 10## 示例:银行服务系统 11 12举例实现一个银行服务系统,每个客户向系统提交一个服务请求,可以区分普通用户和VIP用户,VIP用户的服务请求可以优先得到执行。 13银行系统中有2个窗口,可以并行取出用户提交的服务请求办理。可以利用FFRT的并行队列范式做如下建模: 14 15- **排队逻辑**:并行队列。 16- **服务窗口**:并行队列的并发度,同时也对应FFRT Worker数量。 17- **用户等级**:并行队列任务优先级。 18 19实现代码如下所示: 20 21```c 22#include <stdio.h> 23#include <unistd.h> 24#include "ffrt/queue.h" 25#include "ffrt/task.h" 26 27ffrt_queue_t create_bank_system(const char *name, int concurrency) 28{ 29 ffrt_queue_attr_t queue_attr; 30 (void)ffrt_queue_attr_init(&queue_attr); 31 ffrt_queue_attr_set_max_concurrency(&queue_attr, concurrency); 32 33 // 创建一个并发队列 34 ffrt_queue_t queue = ffrt_queue_create(ffrt_queue_concurrent, name, &queue_attr); 35 36 // 队列创建完后需要销毁队列属性 37 ffrt_queue_attr_destroy(&queue_attr); 38 if (!queue) { 39 printf("create queue failed\n"); 40 return NULL; 41 } 42 43 printf("create bank system successfully\n"); 44 return queue; 45} 46 47void destroy_bank_system(ffrt_queue_t queue_handle) 48{ 49 ffrt_queue_destroy(queue_handle); 50 printf("destroy bank system successfully\n"); 51} 52 53void bank_business(void *arg) 54{ 55 usleep(100 * 1000); 56 const char *data = (const char *)arg; 57 printf("saving or withdraw for %s\n", data); 58} 59 60// 封装提交队列任务函数 61ffrt_task_handle_t commit_request(ffrt_queue_t bank, void (*func)(void *), const char *name, 62 ffrt_queue_priority_t level, int delay) 63{ 64 ffrt_task_attr_t task_attr; 65 (void)ffrt_task_attr_init(&task_attr); 66 ffrt_task_attr_set_name(&task_attr, name); 67 ffrt_task_attr_set_queue_priority(&task_attr, level); 68 ffrt_task_attr_set_delay(&task_attr, delay); 69 70 return ffrt_queue_submit_h(bank, ffrt_create_function_wrapper(func, NULL, name), &task_attr); 71} 72 73// 封装取消队列任务函数 74int cancel_request(ffrt_task_handle_t request) 75{ 76 return ffrt_queue_cancel(request); 77} 78 79// 封装等待队列任务函数 80void wait_for_request(ffrt_task_handle_t task) 81{ 82 ffrt_queue_wait(task); 83} 84 85int main() 86{ 87 ffrt_queue_t bank = create_bank_system("Bank", 2); 88 if (!bank) { 89 printf("create bank system failed\n"); 90 return -1; 91 } 92 commit_request(bank, bank_business, "customer1", ffrt_queue_priority_low, 0); 93 commit_request(bank, bank_business, "customer2", ffrt_queue_priority_low, 0); 94 commit_request(bank, bank_business, "customer3", ffrt_queue_priority_low, 0); 95 commit_request(bank, bank_business, "customer4", ffrt_queue_priority_low, 0); 96 97 // VIP享受更优先的服务 98 commit_request(bank, bank_business, "VIP", ffrt_queue_priority_high, 0); 99 100 ffrt_task_handle_t task = commit_request(bank, bank_business, "customer5", ffrt_queue_priority_low, 0); 101 ffrt_task_handle_t task_last = commit_request(bank, bank_business, "customer6", ffrt_queue_priority_low, 0); 102 103 // 取消客户5的服务 104 cancel_request(task); 105 106 // 等待所有的客户服务完成 107 wait_for_request(task_last); 108 destroy_bank_system(bank); 109 110 return 0; 111} 112``` 113 114C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。 115 116```c 117typedef struct { 118 ffrt_function_header_t header; 119 ffrt_function_t func; 120 ffrt_function_t after_func; 121 void* arg; 122} c_function_t; 123 124static inline void ffrt_exec_function_wrapper(void* t) 125{ 126 c_function_t* f = (c_function_t *)t; 127 if (f->func) { 128 f->func(f->arg); 129 } 130} 131 132static inline void ffrt_destroy_function_wrapper(void* t) 133{ 134 c_function_t* f = (c_function_t *)t; 135 if (f->after_func) { 136 f->after_func(f->arg); 137 } 138} 139 140#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1]) 141static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func, 142 const ffrt_function_t after_func, void *arg) 143{ 144 FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size, 145 size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size); 146 147 c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_queue); 148 f->header.exec = ffrt_exec_function_wrapper; 149 f->header.destroy = ffrt_destroy_function_wrapper; 150 f->func = func; 151 f->after_func = after_func; 152 f->arg = arg; 153 return (ffrt_function_header_t *)f; 154} 155``` 156 157## 接口说明 158 159上述样例中涉及到主要的FFRT的接口包括: 160 161| 名称 | 描述 | 162| -------------------------------------------------------------------------------------------------- | ---------------------- | 163| [ffrt_queue_create](ffrt-api-guideline-c.md#ffrt_queue_create) | 创建队列。 | 164| [ffrt_queue_destroy](ffrt-api-guideline-c.md#ffrt_queue_destroy) | 销毁队列。 | 165| [ffrt_task_attr_set_queue_priority](ffrt-api-guideline-c.md#ffrt_task_attr_set_queue_priority) | 设置队列任务优先级。 | 166| [ffrt_queue_attr_set_max_concurrency](ffrt-api-guideline-c.md#ffrt_queue_attr_set_max_concurrency) | 设置并发队列的并发度。 | 167 168## 约束限制 169 1701. `ffrt_queue_attr_t`必须先调用`ffrt_queue_attr_init`初始化后再设置/获取属性,不再使用后需要显式调用`ffrt_queue_attr_destroy`释放资源。 1712. `ffrt_queue_t`必须在进程退出前显式调用`ffrt_queue_destroy`释放资源。 1723. 并发队列最大并发度建议控制在合理范围内,配置过大超过Worker线程数没有意义,配置过小可能导致系统资源利用率不足。 173