• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Function Flow Runtime并发队列(C)
2
3## 概述
4
5FFRT并发队列提供了设置任务优先级(Priority)和队列并发度的能力,使得队列中的任务能同时在多个线程上执行,获得更高的并行效果。
6
7- **队列并发度**:通过队列最大并发度设置,可以控制同一时刻同时执行的任务数量。这有助于避免任务并发过多对系统资源造成冲击,从而保证系统的稳定性和性能。
8- **任务优先级**:用户可以为每个任务设置优先级,不同的任务将严格按照优先级进行调度和执行。相同优先级的任务按照排队顺序执行,高优先级的任务将优先于低优先级的任务执行,确保关键任务能够及时处理。
9
10## 示例:银行服务系统
11
12举例实现一个银行服务系统,每个客户向系统提交一个服务请求,可以区分普通用户和VIP用户,VIP用户的服务请求可以优先得到执行。
13银行系统中有2个窗口,可以并行取出用户提交的服务请求办理。可以利用FFRT的并行队列范式做如下建模:
14
15- **排队逻辑**:并行队列。
16- **服务窗口**:并行队列的并发度,同时也对应FFRT Worker数量。
17- **用户等级**:并行队列任务优先级。
18
19实现代码如下所示:
20
21```c
22#include <stdio.h>
23#include <unistd.h>
24#include "ffrt/queue.h"
25#include "ffrt/task.h"
26
27ffrt_queue_t create_bank_system(const char *name, int concurrency)
28{
29    ffrt_queue_attr_t queue_attr;
30    (void)ffrt_queue_attr_init(&queue_attr);
31    ffrt_queue_attr_set_max_concurrency(&queue_attr, concurrency);
32
33    // 创建一个并发队列
34    ffrt_queue_t queue = ffrt_queue_create(ffrt_queue_concurrent, name, &queue_attr);
35
36    // 队列创建完后需要销毁队列属性
37    ffrt_queue_attr_destroy(&queue_attr);
38    if (!queue) {
39        printf("create queue failed\n");
40        return NULL;
41    }
42
43    printf("create bank system successfully\n");
44    return queue;
45}
46
47void destroy_bank_system(ffrt_queue_t queue_handle)
48{
49    ffrt_queue_destroy(queue_handle);
50    printf("destroy bank system successfully\n");
51}
52
53void bank_business(void *arg)
54{
55    usleep(100 * 1000);
56    const char *data = (const char *)arg;
57    printf("saving or withdraw for %s\n", data);
58}
59
60// 封装提交队列任务函数
61ffrt_task_handle_t commit_request(ffrt_queue_t bank, void (*func)(void *), const char *name,
62    ffrt_queue_priority_t level, int delay)
63{
64    ffrt_task_attr_t task_attr;
65    (void)ffrt_task_attr_init(&task_attr);
66    ffrt_task_attr_set_name(&task_attr, name);
67    ffrt_task_attr_set_queue_priority(&task_attr, level);
68    ffrt_task_attr_set_delay(&task_attr, delay);
69
70    return ffrt_queue_submit_h(bank, ffrt_create_function_wrapper(func, NULL, name), &task_attr);
71}
72
73// 封装取消队列任务函数
74int cancel_request(ffrt_task_handle_t request)
75{
76    return ffrt_queue_cancel(request);
77}
78
79// 封装等待队列任务函数
80void wait_for_request(ffrt_task_handle_t task)
81{
82    ffrt_queue_wait(task);
83}
84
85int main()
86{
87    ffrt_queue_t bank = create_bank_system("Bank", 2);
88    if (!bank) {
89        printf("create bank system failed\n");
90        return -1;
91    }
92    commit_request(bank, bank_business, "customer1", ffrt_queue_priority_low, 0);
93    commit_request(bank, bank_business, "customer2", ffrt_queue_priority_low, 0);
94    commit_request(bank, bank_business, "customer3", ffrt_queue_priority_low, 0);
95    commit_request(bank, bank_business, "customer4", ffrt_queue_priority_low, 0);
96
97    // VIP享受更优先的服务
98    commit_request(bank, bank_business, "VIP", ffrt_queue_priority_high, 0);
99
100    ffrt_task_handle_t task = commit_request(bank, bank_business, "customer5", ffrt_queue_priority_low, 0);
101    ffrt_task_handle_t task_last = commit_request(bank, bank_business, "customer6", ffrt_queue_priority_low, 0);
102
103    // 取消客户5的服务
104    cancel_request(task);
105
106    // 等待所有的客户服务完成
107    wait_for_request(task_last);
108    destroy_bank_system(bank);
109
110    return 0;
111}
112```
113
114C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。
115
116```c
117typedef struct {
118    ffrt_function_header_t header;
119    ffrt_function_t func;
120    ffrt_function_t after_func;
121    void* arg;
122} c_function_t;
123
124static inline void ffrt_exec_function_wrapper(void* t)
125{
126    c_function_t* f = (c_function_t *)t;
127    if (f->func) {
128        f->func(f->arg);
129    }
130}
131
132static inline void ffrt_destroy_function_wrapper(void* t)
133{
134    c_function_t* f = (c_function_t *)t;
135    if (f->after_func) {
136        f->after_func(f->arg);
137    }
138}
139
140#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1])
141static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func,
142    const ffrt_function_t after_func, void *arg)
143{
144    FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size,
145        size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size);
146
147    c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_queue);
148    f->header.exec = ffrt_exec_function_wrapper;
149    f->header.destroy = ffrt_destroy_function_wrapper;
150    f->func = func;
151    f->after_func = after_func;
152    f->arg = arg;
153    return (ffrt_function_header_t *)f;
154}
155```
156
157## 接口说明
158
159上述样例中涉及到主要的FFRT的接口包括:
160
161| 名称                                                                                               | 描述                   |
162| -------------------------------------------------------------------------------------------------- | ---------------------- |
163| [ffrt_queue_create](ffrt-api-guideline-c.md#ffrt_queue_create)                                     | 创建队列。             |
164| [ffrt_queue_destroy](ffrt-api-guideline-c.md#ffrt_queue_destroy)                                   | 销毁队列。             |
165| [ffrt_task_attr_set_queue_priority](ffrt-api-guideline-c.md#ffrt_task_attr_set_queue_priority)     | 设置队列任务优先级。   |
166| [ffrt_queue_attr_set_max_concurrency](ffrt-api-guideline-c.md#ffrt_queue_attr_set_max_concurrency) | 设置并发队列的并发度。 |
167
168## 约束限制
169
1701. `ffrt_queue_attr_t`必须先调用`ffrt_queue_attr_init`初始化后再设置/获取属性,不再使用后需要显式调用`ffrt_queue_attr_destroy`释放资源。
1712. `ffrt_queue_t`必须在进程退出前显式调用`ffrt_queue_destroy`释放资源。
1723. 并发队列最大并发度建议控制在合理范围内,配置过大超过Worker线程数没有意义,配置过小可能导致系统资源利用率不足。
173