• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Function Flow Runtime图依赖并发(C)
2
3## 概述
4
5FFRT图依赖并发范式支持任务依赖和数据依赖两种方式构建任务依赖图。任务依赖图中每个节点表示一个任务,边表示任务之间的依赖关系。任务依赖分为输入依赖`in_deps`和输出依赖`out_deps`。
6
7构建任务依赖图的两种不同方式:
8
9- 当使用任务依赖方式构建任务依赖图时,使用任务句柄`handle`来对应一个任务对象。
10- 当使用数据依赖方式构建任务依赖图时,数据对象表达抽象为数据签名,每个数据签名唯一对应一个数据对象。
11
12### 任务依赖
13
14> **说明:**
15>
16> 当任务句柄出现在一个任务的`in_deps`中时,任务句柄对应的任务是该任务的前置任务;当任务句柄出现在一个任务的`out_deps`中时,任务句柄对应的任务是该任务的后继任务。
17
18任务依赖适用于任务之间有明确顺序或逻辑流程要求的场景,例如:
19
20- 顺序执行的任务,例如:先进行数据预处理任务,然后再进行模型训练任务。
21- 逻辑流程控制,例如:商品交易过程中,通常是先下单,然后是制作,最后是物流运输。
22- 多级任务链,例如:流媒体视频处理过程中,视频解析后可以进行视频转码和视频生成缩略图,然后是视频添加水印,最后是视频发布。
23
24### 数据依赖
25
26> **说明:**
27>
28> 当数据对象的签名出现在一个任务的`in_deps`中时,该任务称为数据对象的消费者任务,消费者任务执行不改变其输入数据对象的内容;
29> 当数据对象的签名出现在任务的`out_deps`中时,该任务称为数据对象的生产者任务,生产者任务执行改变其输出数据对象的内容,从而生成该数据对象的一个新的版本。
30
31数据依赖适用于任务之间通过数据生产和消费关系来触发执行的场景。
32
33一个数据对象可能存在多个版本,每个版本对应一个生产者任务和零个,一个或多个消费者任务,根据生产者任务和消费者任务的下发顺序定义数据对象的多个版本的顺序,以及每个版本所对应的生产者和消费者任务。
34
35数据依赖解除的任务进入就绪状态允许被调度执行,依赖解除状态指任务所有输入数据对象版本的生产者任务执行完成,且所有输出数据对象版本的所有消费者任务执行完成的状态。
36
37FFRT在运行时可动态构建任务之间的基于生产者/消费者的数据依赖关系并遵循任务数据依赖状态执行调度,包括:
38
39- Producer-Consumer依赖
40
41  一个数据对象版本的生产者任务和该数据对象版本的消费者任务之间形成的依赖关系,也称为Read-after-Write依赖。
42
43- Consumer-Producer依赖
44
45  一个数据对象版本的消费者任务和该数据对象的下一个版本的生产者任务之间形成的依赖关系,也称为Write-after-Read依赖。
46
47- Producer-Producer依赖
48
49  一个数据对象版本的生产者任务和该数据对象的下一个版本的生产者任务之间形成的依赖关系,也称为Write-after-Write依赖。
50
51例如,存在一组任务与数据A的关系表述为:
52
53```cpp
54task1(OUT A);
55task2(IN A);
56task3(IN A);
57task4(OUT A);
58task5(OUT A);
59```
60
61![image](figures/ffrt_figure3.png)
62
63为表述方便,本文中的数据流图均以圆圈表示Task,方块表示数据。
64
65可以得出以下结论:
66
67- task1与task2/task3构成Producer-Consumer依赖,即:task2/task3需要等到task1写完A之后才能读A。
68- task2/task3与task4构成Consumer-Producer依赖,即:task4需要等到task2/task3读完A之后才能写A。
69- task4与task5构成Producer-Producer依赖,即:task5需要等到task4写完A之后才能写A。
70
71## 示例:流媒体视频处理
72
73用户上传视频到流媒体平台,处理步骤包含:视频解析A、视频转码B、视频缩略图生成C、视频水印添加D和视频发布E,其中步骤B和步骤C可以并行执行。任务流程如下图所示:
74
75![image](figures/ffrt_figure1.png)
76
77借助FFRT提供了图依赖并发范式,可以描述任务依赖关系,同时并行化上述视频处理流程,代码如下所示:
78
79```c
80static inline void ffrt_submit_c(ffrt_function_t func, const ffrt_function_t after_func,
81    void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr)
82{
83    ffrt_submit_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr);
84}
85
86static inline ffrt_task_handle_t ffrt_submit_h_c(ffrt_function_t func, const ffrt_function_t after_func,
87    void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr)
88{
89    return ffrt_submit_h_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr);
90}
91
92void func_TaskA(void* arg)
93{
94    printf("视频解析\n");
95}
96
97void func_TaskB(void* arg)
98{
99    printf("视频转码\n");
100}
101
102void func_TaskC(void* arg)
103{
104    printf("视频生成缩略图\n");
105}
106
107void func_TaskD(void* arg)
108{
109    printf("视频添加水印\n");
110}
111
112void func_TaskE(void* arg)
113{
114    printf("视频发布\n");
115}
116
117int main()
118{
119    ffrt_task_handle_t hTaskA = ffrt_submit_h_c(func_TaskA, NULL, NULL, NULL, NULL, NULL);
120    const std::vector<ffrt_dependence_t> taskA_deps = {{ffrt_dependence_task, hTaskA}};
121    ffrt_deps_t dTaskA{static_cast<uint32_t>(taskA_deps.size()), taskA_deps.data()};
122
123    ffrt_task_handle_t hTaskB = ffrt_submit_h_c(func_TaskB, NULL, NULL, &dTaskA, NULL, NULL);
124    ffrt_task_handle_t hTaskC = ffrt_submit_h_c(func_TaskC, NULL, NULL, &dTaskA, NULL, NULL);
125
126    const std::vector<ffrt_dependence_t> taskBC_deps = {{ffrt_dependence_task, hTaskB}, {ffrt_dependence_task, hTaskC}};
127    ffrt_deps_t dTaskBC{static_cast<uint32_t>(taskBC_deps.size()), taskBC_deps.data()};
128
129    ffrt_task_handle_t hTaskD = ffrt_submit_h_c(func_TaskD, NULL, NULL, &dTaskBC, NULL, NULL);
130
131    const std::vector<ffrt_dependence_t> taskD_deps = {{ffrt_dependence_task, hTaskD}};
132    ffrt_deps_t dTaskD{static_cast<uint32_t>(taskD_deps.size()), taskD_deps.data()};
133
134    ffrt_submit_c(func_TaskE, NULL, NULL, &dTaskD, NULL, NULL);
135
136    ffrt_wait();
137    return 0;
138}
139```
140
141C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。
142
143```c
144typedef struct {
145    ffrt_function_header_t header;
146    ffrt_function_t func;
147    ffrt_function_t after_func;
148    void* arg;
149} c_function_t;
150
151static inline void ffrt_exec_function_wrapper(void* t)
152{
153    c_function_t* f = (c_function_t *)t;
154    if (f->func) {
155        f->func(f->arg);
156    }
157}
158
159static inline void ffrt_destroy_function_wrapper(void* t)
160{
161    c_function_t* f = (c_function_t *)t;
162    if (f->after_func) {
163        f->after_func(f->arg);
164    }
165}
166
167#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1])
168static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func,
169    const ffrt_function_t after_func, void *arg)
170{
171    FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size,
172        size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size);
173
174    c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_general);
175    f->header.exec = ffrt_exec_function_wrapper;
176    f->header.destroy = ffrt_destroy_function_wrapper;
177    f->func = func;
178    f->after_func = after_func;
179    f->arg = arg;
180    return (ffrt_function_header_t *)f;
181}
182```
183
184预期的输出可能为:
185
186```plain
187视频解析
188视频转码
189视频生成缩略图
190视频添加水印
191视频发布
192```
193
194## 示例:斐波那契数列
195
196斐波那契数列中每个数字是前两个数字之和,计算斐波那契数的过程可以很好地通过数据对象来表达任务依赖关系。使用FFRT并发编程框架计算斐波那契数的代码如下所示:
197
198```c
199#include <stdio.h>
200#include "ffrt.h"
201
202typedef struct {
203    int x;
204    int* y;
205} fib_ffrt_s;
206
207static inline void ffrt_submit_c(ffrt_function_t func, const ffrt_function_t after_func,
208    void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr)
209{
210    ffrt_submit_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr);
211}
212
213void fib_ffrt(void* arg)
214{
215    fib_ffrt_s* p = (fib_ffrt_s*)arg;
216    int x = p->x;
217    int* y = p->y;
218
219    if (x <= 1) {
220        *y = x;
221    } else {
222        int y1, y2;
223        fib_ffrt_s s1 = {x - 1, &y1};
224        fib_ffrt_s s2 = {x - 2, &y2};
225        const std::vector<ffrt_dependence_t> dx_deps = {{ffrt_dependence_data, &x}};
226        ffrt_deps_t dx{static_cast<uint32_t>(dx_deps.size()), dx_deps.data()};
227        const std::vector<ffrt_dependence_t> dy1_deps = {{ffrt_dependence_data, &y1}};
228        ffrt_deps_t dy1{static_cast<uint32_t>(dy1_deps.size()), dy1_deps.data()};
229        const std::vector<ffrt_dependence_t> dy2_deps = {{ffrt_dependence_data, &y2}};
230        ffrt_deps_t dy2{static_cast<uint32_t>(dy2_deps.size()), dy2_deps.data()};
231        const std::vector<ffrt_dependence_t> dy12_deps = {{ffrt_dependence_data, &y1}, {ffrt_dependence_data, &y2}};
232        ffrt_deps_t dy12{static_cast<uint32_t>(dy12_deps.size()), dy12_deps.data()};
233        ffrt_submit_c(fib_ffrt, NULL, &s1, &dx, &dy1, NULL);
234        ffrt_submit_c(fib_ffrt, NULL, &s2, &dx, &dy2, NULL);
235        ffrt_wait_deps(&dy12);
236        *y = y1 + y2;
237    }
238}
239
240int main(int narg, char** argv)
241{
242    int r;
243    fib_ffrt_s s = {5, &r};
244    const std::vector<ffrt_dependence_t> dr_deps = {{ffrt_dependence_data, &r}};
245    ffrt_deps_t dr{static_cast<uint32_t>(dr_deps.size()), dr_deps.data()};
246    ffrt_submit_c(fib_ffrt, NULL, &s, NULL, &dr, NULL);
247    ffrt_wait_deps(&dr);
248    printf("Fibonacci(5) is %d\n", r);
249    return 0;
250}
251```
252
253预期输出为:
254
255```plain
256Fibonacci(5) is 5
257```
258
259示例中将`fibonacci(x-1)`和`fibonacci(x-2)`作为两个任务提交给FFRT,在两个任务完成之后将结果进行累加。虽然单个任务只是拆分成两个子任务,但是子任务又可以继续进行拆分,因此整个计算图的并行度是非常高的。
260
261各个任务在FFRT内部形成了一颗调用树:
262
263![image](figures/ffrt_figure2.png)
264
265## 接口说明
266
267上述样例中涉及到主要的FFRT的接口包括:
268
269| 名称                                                             | 描述                                   |
270| ---------------------------------------------------------------- | -------------------------------------- |
271| [ffrt_submit_base](ffrt-api-guideline-c.md#ffrt_submit_base)     | 提交任务调度执行。                     |
272| [ffrt_submit_h_base](ffrt-api-guideline-c.md#ffrt_submit_h_base) | 提交任务调度执行并返回任务句柄。       |
273| [ffrt_wait_deps](ffrt-api-guideline-c.md#ffrt_wait_deps)         | 等待依赖的任务完成,当前任务开始执行。 |
274
275## 约束限制
276
277- 使用`ffrt_submit_base`接口进行任务提交时,每个任务的输入依赖和输出依赖的数量之和不能超过8个。
278- 使用`ffrt_submit_h_base`接口进行任务提交时,每个任务的输入依赖和输出依赖的数量之和不能超过7个。
279- 参数既作为输入依赖又作为输出依赖的时候,统计依赖数量时只统计一次,如输入依赖是`{&x}`,输出依赖也是`{&x}`,实际依赖的数量是1。
280