1# Function Flow Runtime串行队列(C) 2 3## 概述 4 5FFRT串行队列基于协程调度模型实现,提供高效的消息队列功能,支持异步通信、流量削峰、无锁化状态和资源管理以及架构解耦等多种业务场景。FFRT串行队列支持以下功能: 6 7- **队列创建销毁**,支持创建和销毁队列,创建时可指定队列名称和优先级。每个队列功能上相当于一个单独的线程,队列中的任务相对于用户线程异步执行。 8- **任务延迟**,支持在任务提交时设置延迟时间(`delay`),单位为微秒(`us`)。延迟任务将在`uptime`(提交时刻+延迟时间)后调度执行。 9- **串行调度**,同一队列中的任务按照`uptime`升序排列,严格串行执行。确保队列中上一个任务完成后,下一个任务才会开始执行。 10- **任务取消**,支持根据任务句柄取消未出队的任务。若任务已开始执行或执行完成,则无法取消。 11- **任务等待**,支持根据任务句柄等待任务完成。指定任务完成时,队列中所有`uptime`早于该任务的任务均已执行完成。 12- **任务优先级**,支持在任务提交时设置单个任务的优先级。优先级仅在任务出队后相对于系统其他负载生效,不影响同一队列内任务的串行顺序。若未设置任务优先级,则默认继承队列的优先级。 13 14## 示例:异步日志系统 15 16举例实现一个异步日志系统,主线程将日志任务提交到队列,后台线程从队列中取出任务并写入文件。这种方式既能保证日志的顺序性,又能避免文件写入操作阻塞主线程。 17 18借助FFRT并行化框架API,开发者只需专注于业务逻辑的实现,无需关注异步线程管理、线程安全及调度效率等问题。 19 20用例简化了异常处理和线程安全相关的一些逻辑,实现代码如下所示: 21 22```c 23#include <stdio.h> 24#include <stdlib.h> 25#include <string.h> 26#include "ffrt.h" 27 28typedef struct { 29 FILE *logFile; // 日志文件指针 30 ffrt_queue_t queue; // 任务队列 31} logger_t; 32 33// 全局Logger变量 34logger_t* g_logger = NULL; 35 36// 初始化日志系统 37logger_t *logger_create(const char *filename) 38{ 39 logger_t *logger = (logger_t *)malloc(sizeof(logger_t)); 40 if (!logger) { 41 perror("Failed to allocate memory for logger_t"); 42 return NULL; 43 } 44 45 // 打开日志文件 46 logger->logFile = fopen(filename, "a"); 47 if (!logger->logFile) { 48 perror("Failed to open log file"); 49 free(logger); 50 return NULL; 51 } 52 printf("Log file opened: %s\n", filename); 53 54 // 创建任务队列 55 logger->queue = ffrt_queue_create(ffrt_queue_serial, "logger_queue_c", NULL); 56 if (!logger->queue) { 57 perror("Failed to create queue"); 58 fclose(logger->logFile); 59 free(logger); 60 return NULL; 61 } 62 63 return logger; 64} 65 66// 销毁日志系统 67void logger_destroy(logger_t *logger) 68{ 69 if (logger) { 70 // 销毁队列 71 if (logger->queue) { 72 ffrt_queue_destroy(logger->queue); 73 } 74 75 // 关闭日志文件 76 if (logger->logFile) { 77 fclose(logger->logFile); 78 printf("Log file closed.\n"); 79 } 80 81 free(logger); 82 } 83} 84 85// 日志任务 86void write_task(void *arg) 87{ 88 char *message = (char *)arg; 89 if (g_logger && g_logger->logFile) { 90 fprintf(g_logger->logFile, "%s\n", message); 91 fflush(g_logger->logFile); 92 } 93 94 free(message); 95} 96 97// 添加日志任务 98void logger_log(logger_t *logger, const char *message) 99{ 100 if (!logger || !logger->queue) { 101 return; 102 } 103 104 // 复制消息字符串 105 char *messageCopy = strdup(message); 106 if (!messageCopy) { 107 perror("Failed to allocate memory for message"); 108 return; 109 } 110 111 ffrt_queue_submit(logger->queue, ffrt_create_function_wrapper(write_task, NULL, messageCopy), NULL); 112} 113 114int main() 115{ 116 // 初始化全局logger 117 g_logger = logger_create("log_c.txt"); 118 if (!g_logger) { 119 return -1; 120 } 121 122 // 使用全局logger添加日志任务 123 logger_log(g_logger, "Log message 1"); 124 logger_log(g_logger, "Log message 2"); 125 logger_log(g_logger, "Log message 3"); 126 127 // 模拟主线程继续执行其他任务 128 sleep(1); 129 130 // 销毁全局logger 131 logger_destroy(g_logger); 132 g_logger = NULL; 133 return 0; 134} 135``` 136 137C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。 138 139```c 140typedef struct { 141 ffrt_function_header_t header; 142 ffrt_function_t func; 143 ffrt_function_t after_func; 144 void* arg; 145} c_function_t; 146 147static inline void ffrt_exec_function_wrapper(void* t) 148{ 149 c_function_t* f = (c_function_t *)t; 150 if (f->func) { 151 f->func(f->arg); 152 } 153} 154 155static inline void ffrt_destroy_function_wrapper(void* t) 156{ 157 c_function_t* f = (c_function_t *)t; 158 if (f->after_func) { 159 f->after_func(f->arg); 160 } 161} 162 163#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1]) 164static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func, 165 const ffrt_function_t after_func, void *arg) 166{ 167 FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size, 168 size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size); 169 170 c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_queue); 171 f->header.exec = ffrt_exec_function_wrapper; 172 f->header.destroy = ffrt_destroy_function_wrapper; 173 f->func = func; 174 f->after_func = after_func; 175 f->arg = arg; 176 return (ffrt_function_header_t *)f; 177} 178``` 179 180## 接口说明 181 182上述样例中涉及到主要的FFRT的接口包括: 183 184| 名称 | 描述 | 185| ---------------------------------------------------------------- | ------------------------------ | 186| [ffrt_queue_create](ffrt-api-guideline-c.md#ffrt_queue_create) | 创建队列。 | 187| [ffrt_queue_destroy](ffrt-api-guideline-c.md#ffrt_queue_destroy) | 销毁队列。 | 188| [ffrt_queue_submit](ffrt-api-guideline-c.md#ffrt_queue_submit) | 提交一个任务到队列中调度执行。 | 189 190## 约束限制 191 192- **避免提交超长任务** FFRT内置进程级队列任务超时检测机制。当串行任务执行时间超过预设阈值(默认30秒)时,系统将打印和上报异常日志,并触发预设的进程超时回调函数(如已配置)。 193- **同步原语使用规范** 在提交给FFRT的任务闭包中,避免使用`std::mutex`、`std::condition_variable`和`std::recursive_mutex`,标准库同步原语会长时间占用FFRT Worker线程。请替换为FFRT提供的同步原语:`ffrt::mutex`、`ffrt::condition_variable`或`ffrt::recursive_mutex`,其用法与标准库相同。 194- **全局变量中的队列管理** 若在全局变量中管理串行队列,随业务进程销毁,测试程序中需注意生命周期解耦问题。在测试用例结束时,需显式释放串行队列,其他资源可随全局变量释放。原因是全局变量在主函数结束后析构,而串行队列的释放依赖于FFRT框架中的其他资源,此时这些资源可能已被销毁。 195