• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Function Flow Runtime并发队列(C)
2
3## 概述
4
5FFRT并发队列提供了设置任务优先级(Priority)和队列并发度的能力,使得队列中的任务能同时在多个线程上执行,获得更高的并行效果。
6
7- **队列并发度**:通过队列最大并发度设置,可以控制同一时刻同时执行的任务数量。这有助于避免任务并发过多对系统资源造成冲击,从而保证系统的稳定性和性能。
8- **任务优先级**:用户可以为每个任务设置优先级,不同的任务将严格按照优先级进行调度和执行。相同优先级的任务按照排队顺序执行,高优先级的任务将优先于低优先级的任务执行,确保关键任务能够及时处理。
9
10## 示例:银行服务系统
11
12举例实现一个银行服务系统,每个客户向系统提交一个服务请求,可以区分普通用户和VIP用户,VIP用户的服务请求可以优先得到执行。
13银行系统中有2个窗口,可以并行取出用户提交的服务请求办理。可以利用FFRT的并行队列范式做如下建模:
14
15- **排队逻辑**:并行队列。
16- **服务窗口**:并行队列的并发度,同时也对应FFRT Worker数量。
17- **用户等级**:并行队列任务优先级。
18
19实现代码如下所示:
20
21```c
22#include <stdio.h>
23#include <string.h>
24#include <ffrt.h>
25
26ffrt_queue_t create_bank_system(const char *name, int concurrency)
27{
28    ffrt_queue_attr_t queue_attr;
29    (void)ffrt_queue_attr_init(&queue_attr);
30    ffrt_queue_attr_set_max_concurrency(&queue_attr, 4);
31
32    // 创建一个并发队列
33    ffrt_queue_t queue = ffrt_queue_create(ffrt_queue_concurrent, name, &queue_attr);
34
35    // 队列创建完后需要销毁队列属性
36    ffrt_queue_attr_destroy(&queue_attr);
37    if (!queue) {
38        printf("create queue failed\n");
39        return NULL;
40    }
41
42    printf("create bank system successful\n");
43    return queue;
44}
45
46void destory_bank_system(ffrt_queue_t queue_handle)
47{
48    ffrt_queue_destroy(queue_handle);
49    printf("destory bank system successful\n");
50}
51
52void bank_business(void *arg)
53{
54    usleep(100 * 1000);
55    const char *data = (const char *)arg;
56    printf("saving or withdraw for %s\n", data);
57}
58
59// 封装提交队列任务函数
60ffrt_task_handle_t commit_request(ffrt_queue_t bank, void (*func)(void *), char *name, ffrt_queue_priority_t level, int delay)
61{
62    ffrt_task_attr_t task_attr;
63    (void)ffrt_task_attr_init(&task_attr);
64    ffrt_task_attr_set_name(&task_attr, name);
65    ffrt_task_attr_set_queue_priority(&task_attr, level);
66    ffrt_task_attr_set_delay(&task_attr, delay);
67
68    return ffrt_queue_submit_h(bank, ffrt_create_function_wrapper(func, NULL, name), &task_attr);
69}
70
71// 封装取消队列任务函数
72int cancel_request(ffrt_task_handle_t request)
73{
74    return ffrt_queue_cancel(request);
75}
76
77int get_bank_queue_size(ffrt_queue_t bank)
78{
79    return ffrt_queue_get_task_cnt(bank);
80}
81
82// 封装等待队列任务函数
83void wait_for_request(ffrt_task_handle_t task)
84{
85    ffrt_queue_wait(task);
86}
87
88int main()
89{
90    ffrt_queue_t bank = create_bank_system("Bank", 2);
91    if (!bank) {
92        printf("create bank system failed");
93        return -1;
94    }
95    commit_request(bank, bank_business, "customer1", ffrt_queue_priority_low, 0);
96    commit_request(bank, bank_business, "customer2", ffrt_queue_priority_low, 0);
97    commit_request(bank, bank_business, "customer3", ffrt_queue_priority_low, 0);
98    commit_request(bank, bank_business, "customer4", ffrt_queue_priority_low, 0);
99
100    // VIP享受更优先的服务
101    commit_request(bank, bank_business, "VIP", ffrt_queue_priority_high, 0);
102
103    ffrt_task_handle_t task = commit_request(bank, bank_business, "customer5", ffrt_queue_priority_low, 0);
104    ffrt_task_handle_t task_last = commit_request(bank, bank_business, "customer6", ffrt_queue_priority_low, 0);
105
106    // 取消客户5的服务
107    cancel_request(task);
108
109    printf("bank current serving for %d customers\n", get_bank_queue_size(bank));
110
111    // 等待所有的客户服务完成
112    wait_for_request(task_last);
113    destory_bank_system(bank);
114
115    return 0;
116}
117```
118
119C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。
120
121```c
122typedef struct {
123    ffrt_function_header_t header;
124    ffrt_function_t func;
125    ffrt_function_t after_func;
126    void* arg;
127} c_function_t;
128
129static inline void ffrt_exec_function_wrapper(void* t)
130{
131    c_function_t* f = (c_function_t *)t;
132    if (f->func) {
133        f->func(f->arg);
134    }
135}
136
137static inline void ffrt_destroy_function_wrapper(void* t)
138{
139    c_function_t* f = (c_function_t *)t;
140    if (f->after_func) {
141        f->after_func(f->arg);
142    }
143}
144
145#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1])
146static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func,
147    const ffrt_function_t after_func, void *arg)
148{
149    FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size,
150        size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size);
151
152    c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_queue);
153    f->header.exec = ffrt_exec_function_wrapper;
154    f->header.destroy = ffrt_destroy_function_wrapper;
155    f->func = func;
156    f->after_func = after_func;
157    f->arg = arg;
158    return (ffrt_function_header_t *)f;
159}
160```
161
162## 接口说明
163
164上述样例中涉及到主要的FFRT的接口包括:
165
166| 名称                                                                                               | 描述                   |
167| -------------------------------------------------------------------------------------------------- | ---------------------- |
168| [ffrt_queue_create](ffrt-api-guideline-c.md#ffrt_queue_create)                                     | 创建队列。             |
169| [ffrt_queue_destroy](ffrt-api-guideline-c.md#ffrt_queue_destroy)                                   | 销毁队列。             |
170| [ffrt_task_attr_set_queue_priority](ffrt-api-guideline-c.md#ffrt_task_attr_set_queue_priority)     | 设置队列任务优先级。   |
171| [ffrt_queue_attr_set_max_concurrency](ffrt-api-guideline-c.md#ffrt_queue_attr_set_max_concurrency) | 设置并发队列的并发度。 |
172
173## 约束限制
174
1751. `ffrt_queue_attr_t`必须先调用`ffrt_queue_attr_init`初始化后再设置/获取属性,不再使用后需要显式调用`ffrt_queue_attr_destroy`释放资源。
1762. `ffrt_queue_t`必须在进程退出前显式调用`ffrt_queue_destroy`释放资源。
1773. 并发队列最大并发度建议控制在合理范围内,配置过大超过Worker线程数没有意义,配置过小可能导致系统资源利用率不足。
178