1# Function Flow Runtime图依赖并发(C) 2 3## 概述 4 5FFRT图依赖并发范式支持任务依赖和数据依赖两种方式构建任务依赖图。任务依赖图中每个节点表示一个任务,边表示任务之间的依赖关系。任务依赖分为输入依赖`in_deps`和输出依赖`out_deps`。 6 7构建任务依赖图的两种不同方式: 8 9- 当使用任务依赖方式构建任务依赖图时,使用任务句柄`handle`来对应一个任务对象。 10- 当使用数据依赖方式构建任务依赖图时,数据对象表达抽象为数据签名,每个数据签名唯一对应一个数据对象。 11 12### 任务依赖 13 14> **说明:** 15> 16> 当任务句柄出现在一个任务的`in_deps`中时,任务句柄对应的任务是该任务的前置任务;当任务句柄出现在一个任务的`out_deps`中时,任务句柄对应的任务是该任务的后继任务。 17 18任务依赖适用于任务之间有明确顺序或逻辑流程要求的场景,例如: 19 20- 顺序执行的任务,例如:先进行数据预处理任务,然后再进行模型训练任务。 21- 逻辑流程控制,例如:商品交易过程中,通常是先下单,然后是制作,最后是物流运输。 22- 多级任务链,例如:流媒体视频处理过程中,视频解析后可以进行视频转码和视频生成缩略图,然后是视频添加水印,最后是视频发布。 23 24### 数据依赖 25 26> **说明:** 27> 28> 当数据对象的签名出现在一个任务的`in_deps`中时,该任务称为数据对象的消费者任务,消费者任务执行不改变其输入数据对象的内容; 29> 当数据对象的签名出现在任务的`out_deps`中时,该任务称为数据对象的生产者任务,生产者任务执行改变其输出数据对象的内容,从而生成该数据对象的一个新的版本。 30 31数据依赖适用于任务之间通过数据生产和消费关系来触发执行的场景。 32 33一个数据对象可能存在多个版本,每个版本对应一个生产者任务和零个,一个或多个消费者任务,根据生产者任务和消费者任务的下发顺序定义数据对象的多个版本的顺序,以及每个版本所对应的生产者和消费者任务。 34 35数据依赖解除的任务进入就绪状态允许被调度执行,依赖解除状态指任务所有输入数据对象版本的生产者任务执行完成,且所有输出数据对象版本的所有消费者任务执行完成的状态。 36 37FFRT在运行时可动态构建任务之间的基于生产者/消费者的数据依赖关系并遵循任务数据依赖状态执行调度,包括: 38 39- Producer-Consumer依赖 40 41 一个数据对象版本的生产者任务和该数据对象版本的消费者任务之间形成的依赖关系,也称为Read-after-Write依赖。 42 43- Consumer-Producer依赖 44 45 一个数据对象版本的消费者任务和该数据对象的下一个版本的生产者任务之间形成的依赖关系,也称为Write-after-Read依赖。 46 47- Producer-Producer依赖 48 49 一个数据对象版本的生产者任务和该数据对象的下一个版本的生产者任务之间形成的依赖关系,也称为Write-after-Write依赖。 50 51例如,存在一组任务与数据A的关系表述为: 52 53```cpp 54task1(OUT A); 55task2(IN A); 56task3(IN A); 57task4(OUT A); 58task5(OUT A); 59``` 60 61 62 63为表述方便,本文中的数据流图均以圆圈表示Task,方块表示数据。 64 65可以得出以下结论: 66 67- task1与task2/task3构成Producer-Consumer依赖,即:task2/task3需要等到task1写完A之后才能读A。 68- task2/task3与task4构成Consumer-Producer依赖,即:task4需要等到task2/task3读完A之后才能写A。 69- task4与task5构成Producer-Producer依赖,即:task5需要等到task4写完A之后才能写A。 70 71## 示例:流媒体视频处理 72 73用户上传视频到流媒体平台,处理步骤包含:视频解析A、视频转码B、视频缩略图生成C、视频水印添加D和视频发布E,其中步骤B和步骤C可以并行执行。任务流程如下图所示: 74 75 76 77借助FFRT提供了图依赖并发范式,可以描述任务依赖关系,同时并行化上述视频处理流程,代码如下所示: 78 79```c 80#include <stdio.h> 81#include "ffrt/task.h" 82 83static inline void ffrt_submit_c(ffrt_function_t func, const ffrt_function_t after_func, 84 void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr) 85{ 86 ffrt_submit_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr); 87} 88 89static inline ffrt_task_handle_t ffrt_submit_h_c(ffrt_function_t func, const ffrt_function_t after_func, 90 void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr) 91{ 92 return ffrt_submit_h_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr); 93} 94 95void func_TaskA(void* arg) 96{ 97 printf("视频解析\n"); 98} 99 100void func_TaskB(void* arg) 101{ 102 printf("视频转码\n"); 103} 104 105void func_TaskC(void* arg) 106{ 107 printf("视频生成缩略图\n"); 108} 109 110void func_TaskD(void* arg) 111{ 112 printf("视频添加水印\n"); 113} 114 115void func_TaskE(void* arg) 116{ 117 printf("视频发布\n"); 118} 119 120int main() 121{ 122 // 提交任务A 123 ffrt_task_handle_t hTaskA = ffrt_submit_h_c(func_TaskA, NULL, NULL, NULL, NULL, NULL); 124 125 // 提交任务B和C 126 ffrt_dependence_t taskA_deps[] = {{ffrt_dependence_task, hTaskA}}; 127 ffrt_deps_t dTaskA = {1, taskA_deps}; 128 ffrt_task_handle_t hTaskB = ffrt_submit_h_c(func_TaskB, NULL, NULL, &dTaskA, NULL, NULL); 129 ffrt_task_handle_t hTaskC = ffrt_submit_h_c(func_TaskC, NULL, NULL, &dTaskA, NULL, NULL); 130 131 // 提交任务D 132 ffrt_dependence_t taskBC_deps[] = {{ffrt_dependence_task, hTaskB}, {ffrt_dependence_task, hTaskC}}; 133 ffrt_deps_t dTaskBC = {2, taskBC_deps}; 134 ffrt_task_handle_t hTaskD = ffrt_submit_h_c(func_TaskD, NULL, NULL, &dTaskBC, NULL, NULL); 135 136 // 提交任务E 137 ffrt_dependence_t taskD_deps[] = {{ffrt_dependence_task, hTaskD}}; 138 ffrt_deps_t dTaskD = {1, taskD_deps}; 139 ffrt_submit_c(func_TaskE, NULL, NULL, &dTaskD, NULL, NULL); 140 141 // 等待所有任务完成 142 ffrt_wait(); 143 return 0; 144} 145``` 146 147C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。 148 149```c 150typedef struct { 151 ffrt_function_header_t header; 152 ffrt_function_t func; 153 ffrt_function_t after_func; 154 void* arg; 155} c_function_t; 156 157static inline void ffrt_exec_function_wrapper(void* t) 158{ 159 c_function_t* f = (c_function_t *)t; 160 if (f->func) { 161 f->func(f->arg); 162 } 163} 164 165static inline void ffrt_destroy_function_wrapper(void* t) 166{ 167 c_function_t* f = (c_function_t *)t; 168 if (f->after_func) { 169 f->after_func(f->arg); 170 } 171} 172 173#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1]) 174static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func, 175 const ffrt_function_t after_func, void *arg) 176{ 177 FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size, 178 size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size); 179 180 c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_general); 181 f->header.exec = ffrt_exec_function_wrapper; 182 f->header.destroy = ffrt_destroy_function_wrapper; 183 f->func = func; 184 f->after_func = after_func; 185 f->arg = arg; 186 return (ffrt_function_header_t *)f; 187} 188``` 189 190预期的输出可能为: 191 192```plain 193视频解析 194视频转码 195视频生成缩略图 196视频添加水印 197视频发布 198``` 199 200## 示例:斐波那契数列 201 202斐波那契数列中每个数字是前两个数字之和,计算斐波那契数的过程可以很好地通过数据对象来表达任务依赖关系。使用FFRT并发编程框架计算斐波那契数的代码如下所示: 203 204```c 205#include <stdio.h> 206#include "ffrt/task.h" 207 208typedef struct { 209 int x; 210 int* y; 211} fib_ffrt_s; 212 213static inline void ffrt_submit_c(ffrt_function_t func, const ffrt_function_t after_func, 214 void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr) 215{ 216 ffrt_submit_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr); 217} 218 219void fib_ffrt(void* arg) 220{ 221 fib_ffrt_s* p = (fib_ffrt_s*)arg; 222 int x = p->x; 223 int* y = p->y; 224 225 if (x <= 1) { 226 *y = x; 227 } else { 228 int y1, y2; 229 fib_ffrt_s s1 = {x - 1, &y1}; 230 fib_ffrt_s s2 = {x - 2, &y2}; 231 232 // 构建数据依赖 233 ffrt_dependence_t dx_deps[] = {{ffrt_dependence_data, &x}}; 234 ffrt_deps_t dx = {1, dx_deps}; 235 ffrt_dependence_t dy1_deps[] = {{ffrt_dependence_data, &y1}}; 236 ffrt_deps_t dy1 = {1, dy1_deps}; 237 ffrt_dependence_t dy2_deps[] = {{ffrt_dependence_data, &y2}}; 238 ffrt_deps_t dy2 = {1, dy2_deps}; 239 ffrt_dependence_t dy12_deps[] = {{ffrt_dependence_data, &y1}, {ffrt_dependence_data, &y2}}; 240 ffrt_deps_t dy12 = {2, dy12_deps}; 241 242 // 分别提交任务 243 ffrt_submit_c(fib_ffrt, NULL, &s1, &dx, &dy1, NULL); 244 ffrt_submit_c(fib_ffrt, NULL, &s2, &dx, &dy2, NULL); 245 246 // 等待任务完成 247 ffrt_wait_deps(&dy12); 248 *y = y1 + y2; 249 } 250} 251 252int main() 253{ 254 int r; 255 fib_ffrt_s s = {5, &r}; 256 ffrt_dependence_t dr_deps[] = {{ffrt_dependence_data, &r}}; 257 ffrt_deps_t dr = {1, dr_deps}; 258 ffrt_submit_c(fib_ffrt, NULL, &s, NULL, &dr, NULL); 259 260 // 等待任务完成 261 ffrt_wait_deps(&dr); 262 printf("Fibonacci(5) is %d\n", r); 263 return 0; 264} 265``` 266 267预期输出为: 268 269```plain 270Fibonacci(5) is 5 271``` 272 273示例中将`fibonacci(x-1)`和`fibonacci(x-2)`作为两个任务提交给FFRT,在两个任务完成之后将结果进行累加。虽然单个任务只是拆分成两个子任务,但是子任务又可以继续进行拆分,因此整个计算图的并行度是非常高的。 274 275各个任务在FFRT内部形成了一颗调用树: 276 277 278 279## 接口说明 280 281上述样例中涉及到主要的FFRT的接口包括: 282 283| 名称 | 描述 | 284| ---------------------------------------------------------------- | -------------------------------------- | 285| [ffrt_submit_base](ffrt-api-guideline-c.md#ffrt_submit_base) | 提交任务调度执行。 | 286| [ffrt_submit_h_base](ffrt-api-guideline-c.md#ffrt_submit_h_base) | 提交任务调度执行并返回任务句柄。 | 287| [ffrt_wait_deps](ffrt-api-guideline-c.md#ffrt_wait_deps) | 等待依赖的任务完成,当前任务开始执行。 | 288 289## 约束限制 290 291- 使用`ffrt_submit_base`接口进行任务提交时,每个任务的输入依赖和输出依赖的数量之和不能超过8个。 292- 使用`ffrt_submit_h_base`接口进行任务提交时,每个任务的输入依赖和输出依赖的数量之和不能超过7个。 293- 参数既作为输入依赖又作为输出依赖的时候,统计依赖数量时只统计一次,如输入依赖是`{&x}`,输出依赖也是`{&x}`,实际依赖的数量是1。 294