1# Function Flow Runtime图依赖并发(C) 2 3## 概述 4 5FFRT图依赖并发范式支持任务依赖和数据依赖两种方式构建任务依赖图。任务依赖图中每个节点表示一个任务,边表示任务之间的依赖关系。任务依赖分为输入依赖`in_deps`和输出依赖`out_deps`。 6 7构建任务依赖图的两种不同方式: 8 9- 当使用任务依赖方式构建任务依赖图时,使用任务句柄`handle`来对应一个任务对象。 10- 当使用数据依赖方式构建任务依赖图时,数据对象表达抽象为数据签名,每个数据签名唯一对应一个数据对象。 11 12### 任务依赖 13 14> **说明:** 15> 16> 当任务句柄出现在一个任务的`in_deps`中时,任务句柄对应的任务是该任务的前置任务;当任务句柄出现在一个任务的`out_deps`中时,任务句柄对应的任务是该任务的后继任务。 17 18任务依赖适用于任务之间有明确顺序或逻辑流程要求的场景,例如: 19 20- 顺序执行的任务,例如:先进行数据预处理任务,然后再进行模型训练任务。 21- 逻辑流程控制,例如:商品交易过程中,通常是先下单,然后是制作,最后是物流运输。 22- 多级任务链,例如:流媒体视频处理过程中,视频解析后可以进行视频转码和视频生成缩略图,然后是视频添加水印,最后是视频发布。 23 24### 数据依赖 25 26> **说明:** 27> 28> 当数据对象的签名出现在一个任务的`in_deps`中时,该任务称为数据对象的消费者任务,消费者任务执行不改变其输入数据对象的内容; 29> 当数据对象的签名出现在任务的`out_deps`中时,该任务称为数据对象的生产者任务,生产者任务执行改变其输出数据对象的内容,从而生成该数据对象的一个新的版本。 30 31数据依赖适用于任务之间通过数据生产和消费关系来触发执行的场景。 32 33一个数据对象可能存在多个版本,每个版本对应一个生产者任务和零个,一个或多个消费者任务,根据生产者任务和消费者任务的下发顺序定义数据对象的多个版本的顺序,以及每个版本所对应的生产者和消费者任务。 34 35数据依赖解除的任务进入就绪状态允许被调度执行,依赖解除状态指任务所有输入数据对象版本的生产者任务执行完成,且所有输出数据对象版本的所有消费者任务执行完成的状态。 36 37FFRT在运行时可动态构建任务之间的基于生产者/消费者的数据依赖关系并遵循任务数据依赖状态执行调度,包括: 38 39- Producer-Consumer依赖 40 41 一个数据对象版本的生产者任务和该数据对象版本的消费者任务之间形成的依赖关系,也称为Read-after-Write依赖。 42 43- Consumer-Producer依赖 44 45 一个数据对象版本的消费者任务和该数据对象的下一个版本的生产者任务之间形成的依赖关系,也称为Write-after-Read依赖。 46 47- Producer-Producer依赖 48 49 一个数据对象版本的生产者任务和该数据对象的下一个版本的生产者任务之间形成的依赖关系,也称为Write-after-Write依赖。 50 51例如,存在一组任务与数据A的关系表述为: 52 53```cpp 54task1(OUT A); 55task2(IN A); 56task3(IN A); 57task4(OUT A); 58task5(OUT A); 59``` 60 61 62 63为表述方便,本文中的数据流图均以圆圈表示Task,方块表示数据。 64 65可以得出以下结论: 66 67- task1与task2/task3构成Producer-Consumer依赖,即:task2/task3需要等到task1写完A之后才能读A。 68- task2/task3与task4构成Consumer-Producer依赖,即:task4需要等到task2/task3读完A之后才能写A。 69- task4与task5构成Producer-Producer依赖,即:task5需要等到task4写完A之后才能写A。 70 71## 示例:流媒体视频处理 72 73用户上传视频到流媒体平台,处理步骤包含:视频解析A、视频转码B、视频缩略图生成C、视频水印添加D和视频发布E,其中步骤B和步骤C可以并行执行。任务流程如下图所示: 74 75 76 77借助FFRT提供了图依赖并发范式,可以描述任务依赖关系,同时并行化上述视频处理流程,代码如下所示: 78 79```c 80static inline void ffrt_submit_c(ffrt_function_t func, const ffrt_function_t after_func, 81 void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr) 82{ 83 ffrt_submit_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr); 84} 85 86static inline ffrt_task_handle_t ffrt_submit_h_c(ffrt_function_t func, const ffrt_function_t after_func, 87 void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr) 88{ 89 return ffrt_submit_h_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr); 90} 91 92void func_TaskA(void* arg) 93{ 94 printf("视频解析\n"); 95} 96 97void func_TaskB(void* arg) 98{ 99 printf("视频转码\n"); 100} 101 102void func_TaskC(void* arg) 103{ 104 printf("视频生成缩略图\n"); 105} 106 107void func_TaskD(void* arg) 108{ 109 printf("视频添加水印\n"); 110} 111 112void func_TaskE(void* arg) 113{ 114 printf("视频发布\n"); 115} 116 117int main() 118{ 119 ffrt_task_handle_t hTaskA = ffrt_submit_h_c(func_TaskA, NULL, NULL, NULL, NULL, NULL); 120 const std::vector<ffrt_dependence_t> taskA_deps = {{ffrt_dependence_task, hTaskA}}; 121 ffrt_deps_t dTaskA{static_cast<uint32_t>(taskA_deps.size()), taskA_deps.data()}; 122 123 ffrt_task_handle_t hTaskB = ffrt_submit_h_c(func_TaskB, NULL, NULL, &dTaskA, NULL, NULL); 124 ffrt_task_handle_t hTaskC = ffrt_submit_h_c(func_TaskC, NULL, NULL, &dTaskA, NULL, NULL); 125 126 const std::vector<ffrt_dependence_t> taskBC_deps = {{ffrt_dependence_task, hTaskB}, {ffrt_dependence_task, hTaskC}}; 127 ffrt_deps_t dTaskBC{static_cast<uint32_t>(taskBC_deps.size()), taskBC_deps.data()}; 128 129 ffrt_task_handle_t hTaskD = ffrt_submit_h_c(func_TaskD, NULL, NULL, &dTaskBC, NULL, NULL); 130 131 const std::vector<ffrt_dependence_t> taskD_deps = {{ffrt_dependence_task, hTaskD}}; 132 ffrt_deps_t dTaskD{static_cast<uint32_t>(taskD_deps.size()), taskD_deps.data()}; 133 134 ffrt_submit_c(func_TaskE, NULL, NULL, &dTaskD, NULL, NULL); 135 136 ffrt_wait(); 137 return 0; 138} 139``` 140 141C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。 142 143```c 144typedef struct { 145 ffrt_function_header_t header; 146 ffrt_function_t func; 147 ffrt_function_t after_func; 148 void* arg; 149} c_function_t; 150 151static inline void ffrt_exec_function_wrapper(void* t) 152{ 153 c_function_t* f = (c_function_t *)t; 154 if (f->func) { 155 f->func(f->arg); 156 } 157} 158 159static inline void ffrt_destroy_function_wrapper(void* t) 160{ 161 c_function_t* f = (c_function_t *)t; 162 if (f->after_func) { 163 f->after_func(f->arg); 164 } 165} 166 167#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1]) 168static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func, 169 const ffrt_function_t after_func, void *arg) 170{ 171 FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size, 172 size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size); 173 174 c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_general); 175 f->header.exec = ffrt_exec_function_wrapper; 176 f->header.destroy = ffrt_destroy_function_wrapper; 177 f->func = func; 178 f->after_func = after_func; 179 f->arg = arg; 180 return (ffrt_function_header_t *)f; 181} 182``` 183 184预期的输出可能为: 185 186```plain 187视频解析 188视频转码 189视频生成缩略图 190视频添加水印 191视频发布 192``` 193 194## 示例:斐波那契数列 195 196斐波那契数列中每个数字是前两个数字之和,计算斐波那契数的过程可以很好地通过数据对象来表达任务依赖关系。使用FFRT并发编程框架计算斐波那契数的代码如下所示: 197 198```c 199#include <stdio.h> 200#include "ffrt.h" 201 202typedef struct { 203 int x; 204 int* y; 205} fib_ffrt_s; 206 207static inline void ffrt_submit_c(ffrt_function_t func, const ffrt_function_t after_func, 208 void* arg, const ffrt_deps_t* in_deps, const ffrt_deps_t* out_deps, const ffrt_task_attr_t* attr) 209{ 210 ffrt_submit_base(ffrt_create_function_wrapper(func, after_func, arg), in_deps, out_deps, attr); 211} 212 213void fib_ffrt(void* arg) 214{ 215 fib_ffrt_s* p = (fib_ffrt_s*)arg; 216 int x = p->x; 217 int* y = p->y; 218 219 if (x <= 1) { 220 *y = x; 221 } else { 222 int y1, y2; 223 fib_ffrt_s s1 = {x - 1, &y1}; 224 fib_ffrt_s s2 = {x - 2, &y2}; 225 const std::vector<ffrt_dependence_t> dx_deps = {{ffrt_dependence_data, &x}}; 226 ffrt_deps_t dx{static_cast<uint32_t>(dx_deps.size()), dx_deps.data()}; 227 const std::vector<ffrt_dependence_t> dy1_deps = {{ffrt_dependence_data, &y1}}; 228 ffrt_deps_t dy1{static_cast<uint32_t>(dy1_deps.size()), dy1_deps.data()}; 229 const std::vector<ffrt_dependence_t> dy2_deps = {{ffrt_dependence_data, &y2}}; 230 ffrt_deps_t dy2{static_cast<uint32_t>(dy2_deps.size()), dy2_deps.data()}; 231 const std::vector<ffrt_dependence_t> dy12_deps = {{ffrt_dependence_data, &y1}, {ffrt_dependence_data, &y2}}; 232 ffrt_deps_t dy12{static_cast<uint32_t>(dy12_deps.size()), dy12_deps.data()}; 233 ffrt_submit_c(fib_ffrt, NULL, &s1, &dx, &dy1, NULL); 234 ffrt_submit_c(fib_ffrt, NULL, &s2, &dx, &dy2, NULL); 235 ffrt_wait_deps(&dy12); 236 *y = y1 + y2; 237 } 238} 239 240int main(int narg, char** argv) 241{ 242 int r; 243 fib_ffrt_s s = {5, &r}; 244 const std::vector<ffrt_dependence_t> dr_deps = {{ffrt_dependence_data, &r}}; 245 ffrt_deps_t dr{static_cast<uint32_t>(dr_deps.size()), dr_deps.data()}; 246 ffrt_submit_c(fib_ffrt, NULL, &s, NULL, &dr, NULL); 247 ffrt_wait_deps(&dr); 248 printf("Fibonacci(5) is %d\n", r); 249 return 0; 250} 251``` 252 253预期输出为: 254 255```plain 256Fibonacci(5) is 5 257``` 258 259示例中将`fibonacci(x-1)`和`fibonacci(x-2)`作为两个任务提交给FFRT,在两个任务完成之后将结果进行累加。虽然单个任务只是拆分成两个子任务,但是子任务又可以继续进行拆分,因此整个计算图的并行度是非常高的。 260 261各个任务在FFRT内部形成了一颗调用树: 262 263 264 265## 接口说明 266 267上述样例中涉及到主要的FFRT的接口包括: 268 269| 名称 | 描述 | 270| ---------------------------------------------------------------- | -------------------------------------- | 271| [ffrt_submit_base](ffrt-api-guideline-c.md#ffrt_submit_base) | 提交任务调度执行。 | 272| [ffrt_submit_h_base](ffrt-api-guideline-c.md#ffrt_submit_h_base) | 提交任务调度执行并返回任务句柄。 | 273| [ffrt_wait_deps](ffrt-api-guideline-c.md#ffrt_wait_deps) | 等待依赖的任务完成,当前任务开始执行。 | 274 275## 约束限制 276 277- 使用`ffrt_submit_base`接口进行任务提交时,每个任务的输入依赖和输出依赖的数量之和不能超过8个。 278- 使用`ffrt_submit_h_base`接口进行任务提交时,每个任务的输入依赖和输出依赖的数量之和不能超过7个。 279- 参数既作为输入依赖又作为输出依赖的时候,统计依赖数量时只统计一次,如输入依赖是`{&x}`,输出依赖也是`{&x}`,实际依赖的数量是1。 280