• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Function Flow Runtime图依赖并发(C)
2
3## 概述
4
5FFRT图依赖并发范式支持任务依赖和数据依赖两种方式构建任务依赖图。任务依赖图中每个节点表示一个任务,边表示任务之间的依赖关系。任务依赖分为输入依赖`in_deps`和输出依赖`out_deps`。
6
7构建任务依赖图的两种不同方式:
8
9- 当使用任务依赖方式构建任务依赖图时,使用任务句柄`handle`来对应一个任务对象。
10- 当使用数据依赖方式构建任务依赖图时,数据对象表达抽象为数据签名,每个数据签名唯一对应一个数据对象。
11
12### 任务依赖
13
14> **说明:**
15>
16> 当任务句柄出现在一个任务的`in_deps`中时,任务句柄对应的任务是该任务的前置任务;当任务句柄出现在一个任务的`out_deps`中时,任务句柄对应的任务是该任务的后继任务。
17
18任务依赖适用于任务之间有明确顺序或逻辑流程要求的场景,例如:
19
20- 顺序执行的任务,例如:先进行数据预处理任务,然后再进行模型训练任务。
21- 逻辑流程控制,例如:商品交易过程中,通常是先下单,然后是制作,最后是物流运输。
22- 多级任务链,例如:流媒体视频处理过程中,视频解析后可以进行视频转码和视频生成缩略图,然后是视频添加水印,最后是视频发布。
23
24### 数据依赖
25
26> **说明:**
27>
28> 当数据对象的签名出现在一个任务的`in_deps`中时,该任务称为数据对象的消费者任务,消费者任务执行不改变其输入数据对象的内容;
29> 当数据对象的签名出现在任务的`out_deps`中时,该任务称为数据对象的生产者任务,生产者任务执行改变其输出数据对象的内容,从而生成该数据对象的一个新的版本。
30
31数据依赖适用于任务之间通过数据生产和消费关系来触发执行的场景。
32
33一个数据对象可能存在多个版本,每个版本对应一个生产者任务和零个,一个或多个消费者任务,根据生产者任务和消费者任务的下发顺序定义数据对象的多个版本的顺序,以及每个版本所对应的生产者和消费者任务。
34
35数据依赖解除的任务进入就绪状态允许被调度执行,依赖解除状态指任务所有输入数据对象版本的生产者任务执行完成,且所有输出数据对象版本的所有消费者任务执行完成的状态。
36
37FFRT在运行时可动态构建任务之间的基于生产者/消费者的数据依赖关系并遵循任务数据依赖状态执行调度,包括:
38
39- Producer-Consumer依赖
40
41  一个数据对象版本的生产者任务和该数据对象版本的消费者任务之间形成的依赖关系,也称为Read-after-Write依赖。
42
43- Consumer-Producer依赖
44
45  一个数据对象版本的消费者任务和该数据对象的下一个版本的生产者任务之间形成的依赖关系,也称为Write-after-Read依赖。
46
47- Producer-Producer依赖
48
49  一个数据对象版本的生产者任务和该数据对象的下一个版本的生产者任务之间形成的依赖关系,也称为Write-after-Write依赖。
50
51例如,存在一组任务与数据A的关系表述为:
52
53```cpp
54task1(OUT A);
55task2(IN A);
56task3(IN A);
57task4(OUT A);
58task5(OUT A);
59```
60
61![image](figures/ffrt_figure3.png)
62
63为表述方便,本文中的数据流图均以圆圈表示Task,方块表示数据。
64
65可以得出以下结论:
66
67- task1与task2/task3构成Producer-Consumer依赖,即:task2/task3需要等到task1写完A之后才能读A。
68- task2/task3与task4构成Consumer-Producer依赖,即:task4需要等到task2/task3读完A之后才能写A。
69- task4与task5构成Producer-Producer依赖,即:task5需要等到task4写完A之后才能写A。
70
71## 示例:流媒体视频处理
72
73用户上传视频到流媒体平台,处理步骤包含:视频解析A、视频转码B、视频缩略图生成C、视频水印添加D和视频发布E,其中步骤B和步骤C可以并行执行。任务流程如下图所示:
74
75![image](figures/ffrt_figure1.png)
76
77借助FFRT提供了图依赖并发范式,可以描述任务依赖关系,同时并行化上述视频处理流程,代码如下所示:
78
79```c
80#include <stdio.h>
81#include "ffrt/task.h"
82
83static inline void ffrt_submit_c(ffrt_function_t func, const ffrt_function_t after_func,
84    void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr)
85{
86    ffrt_submit_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr);
87}
88
89static inline ffrt_task_handle_t ffrt_submit_h_c(ffrt_function_t func, const ffrt_function_t after_func,
90    void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr)
91{
92    return ffrt_submit_h_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr);
93}
94
95void func_TaskA(void* arg)
96{
97    printf("视频解析\n");
98}
99
100void func_TaskB(void* arg)
101{
102    printf("视频转码\n");
103}
104
105void func_TaskC(void* arg)
106{
107    printf("视频生成缩略图\n");
108}
109
110void func_TaskD(void* arg)
111{
112    printf("视频添加水印\n");
113}
114
115void func_TaskE(void* arg)
116{
117    printf("视频发布\n");
118}
119
120int main()
121{
122    // 提交任务A
123    ffrt_task_handle_t hTaskA = ffrt_submit_h_c(func_TaskA, NULL, NULL, NULL, NULL, NULL);
124
125    // 提交任务B和C
126    ffrt_dependence_t taskA_deps[] = {{ffrt_dependence_task, hTaskA}};
127    ffrt_deps_t dTaskA = {1, taskA_deps};
128    ffrt_task_handle_t hTaskB = ffrt_submit_h_c(func_TaskB, NULL, NULL, &dTaskA, NULL, NULL);
129    ffrt_task_handle_t hTaskC = ffrt_submit_h_c(func_TaskC, NULL, NULL, &dTaskA, NULL, NULL);
130
131    // 提交任务D
132    ffrt_dependence_t taskBC_deps[] = {{ffrt_dependence_task, hTaskB}, {ffrt_dependence_task, hTaskC}};
133    ffrt_deps_t dTaskBC = {2, taskBC_deps};
134    ffrt_task_handle_t hTaskD = ffrt_submit_h_c(func_TaskD, NULL, NULL, &dTaskBC, NULL, NULL);
135
136    // 提交任务E
137    ffrt_dependence_t taskD_deps[] = {{ffrt_dependence_task, hTaskD}};
138    ffrt_deps_t dTaskD = {1, taskD_deps};
139    ffrt_submit_c(func_TaskE, NULL, NULL, &dTaskD, NULL, NULL);
140
141    // 等待所有任务完成
142    ffrt_wait();
143    return 0;
144}
145```
146
147C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。
148
149```c
150typedef struct {
151    ffrt_function_header_t header;
152    ffrt_function_t func;
153    ffrt_function_t after_func;
154    void* arg;
155} c_function_t;
156
157static inline void ffrt_exec_function_wrapper(void* t)
158{
159    c_function_t* f = (c_function_t *)t;
160    if (f->func) {
161        f->func(f->arg);
162    }
163}
164
165static inline void ffrt_destroy_function_wrapper(void* t)
166{
167    c_function_t* f = (c_function_t *)t;
168    if (f->after_func) {
169        f->after_func(f->arg);
170    }
171}
172
173#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1])
174static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func,
175    const ffrt_function_t after_func, void *arg)
176{
177    FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size,
178        size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size);
179
180    c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_general);
181    f->header.exec = ffrt_exec_function_wrapper;
182    f->header.destroy = ffrt_destroy_function_wrapper;
183    f->func = func;
184    f->after_func = after_func;
185    f->arg = arg;
186    return (ffrt_function_header_t *)f;
187}
188```
189
190预期的输出可能为:
191
192```plain
193视频解析
194视频转码
195视频生成缩略图
196视频添加水印
197视频发布
198```
199
200## 示例:斐波那契数列
201
202斐波那契数列中每个数字是前两个数字之和,计算斐波那契数的过程可以很好地通过数据对象来表达任务依赖关系。使用FFRT并发编程框架计算斐波那契数的代码如下所示:
203
204```c
205#include <stdio.h>
206#include "ffrt/task.h"
207
208typedef struct {
209    int x;
210    int* y;
211} fib_ffrt_s;
212
213static inline void ffrt_submit_c(ffrt_function_t func, const ffrt_function_t after_func,
214    void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr)
215{
216    ffrt_submit_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr);
217}
218
219void fib_ffrt(void* arg)
220{
221    fib_ffrt_s* p = (fib_ffrt_s*)arg;
222    int x = p->x;
223    int* y = p->y;
224
225    if (x <= 1) {
226        *y = x;
227    } else {
228        int y1, y2;
229        fib_ffrt_s s1 = {x - 1, &y1};
230        fib_ffrt_s s2 = {x - 2, &y2};
231
232        // 构建数据依赖
233        ffrt_dependence_t dx_deps[] = {{ffrt_dependence_data, &x}};
234        ffrt_deps_t dx = {1, dx_deps};
235        ffrt_dependence_t dy1_deps[] = {{ffrt_dependence_data, &y1}};
236        ffrt_deps_t dy1 = {1, dy1_deps};
237        ffrt_dependence_t dy2_deps[] = {{ffrt_dependence_data, &y2}};
238        ffrt_deps_t dy2 = {1, dy2_deps};
239        ffrt_dependence_t dy12_deps[] = {{ffrt_dependence_data, &y1}, {ffrt_dependence_data, &y2}};
240        ffrt_deps_t dy12 = {2, dy12_deps};
241
242        // 分别提交任务
243        ffrt_submit_c(fib_ffrt, NULL, &s1, &dx, &dy1, NULL);
244        ffrt_submit_c(fib_ffrt, NULL, &s2, &dx, &dy2, NULL);
245
246        // 等待任务完成
247        ffrt_wait_deps(&dy12);
248        *y = y1 + y2;
249    }
250}
251
252int main()
253{
254    int r;
255    fib_ffrt_s s = {5, &r};
256    ffrt_dependence_t dr_deps[] = {{ffrt_dependence_data, &r}};
257    ffrt_deps_t dr = {1, dr_deps};
258    ffrt_submit_c(fib_ffrt, NULL, &s, NULL, &dr, NULL);
259
260    // 等待任务完成
261    ffrt_wait_deps(&dr);
262    printf("Fibonacci(5) is %d\n", r);
263    return 0;
264}
265```
266
267预期输出为:
268
269```plain
270Fibonacci(5) is 5
271```
272
273示例中将`fibonacci(x-1)`和`fibonacci(x-2)`作为两个任务提交给FFRT,在两个任务完成之后将结果进行累加。虽然单个任务只是拆分成两个子任务,但是子任务又可以继续进行拆分,因此整个计算图的并行度是非常高的。
274
275各个任务在FFRT内部形成了一颗调用树:
276
277![image](figures/ffrt_figure2.png)
278
279## 接口说明
280
281上述样例中涉及到主要的FFRT的接口包括:
282
283| 名称                                                             | 描述                                   |
284| ---------------------------------------------------------------- | -------------------------------------- |
285| [ffrt_submit_base](ffrt-api-guideline-c.md#ffrt_submit_base)     | 提交任务调度执行。                     |
286| [ffrt_submit_h_base](ffrt-api-guideline-c.md#ffrt_submit_h_base) | 提交任务调度执行并返回任务句柄。       |
287| [ffrt_wait_deps](ffrt-api-guideline-c.md#ffrt_wait_deps)         | 等待依赖的任务完成,当前任务开始执行。 |
288
289## 约束限制
290
291- 使用`ffrt_submit_base`接口进行任务提交时,每个任务的输入依赖和输出依赖的数量之和不能超过8个。
292- 使用`ffrt_submit_h_base`接口进行任务提交时,每个任务的输入依赖和输出依赖的数量之和不能超过7个。
293- 参数既作为输入依赖又作为输出依赖的时候,统计依赖数量时只统计一次,如输入依赖是`{&x}`,输出依赖也是`{&x}`,实际依赖的数量是1。
294