• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Function Flow Runtime串行队列(C)
2
3## 概述
4
5FFRT串行队列基于协程调度模型实现,提供高效的消息队列功能,支持异步通信、流量削峰、无锁化状态和资源管理以及架构解耦等多种业务场景。FFRT串行队列支持以下功能:
6
7- **​队列创建销毁**​,支持创建和销毁队列,创建时可指定队列名称和优先级。每个队列功能上相当于一个单独的线程,队列中的任务相对于用户线程异步执行。
8- **任务延迟**​,支持在任务提交时设置延迟时间(`delay`),单位为微秒(`us`)。延迟任务将在`uptime`(提交时刻+延迟时间)后调度执行。
9- **​串行调度**​,同一队列中的任务按照`uptime`升序排列,严格串行执行。确保队列中上一个任务完成后,下一个任务才会开始执行。
10- **​任务取消**​,支持根据任务句柄取消未出队的任务。若任务已开始执行或执行完成,则无法取消。
11- **​任务等待**​,支持根据任务句柄等待任务完成。指定任务完成时,队列中所有`uptime`早于该任务的任务均已执行完成。
12- **任务优先级**​,支持在任务提交时设置单个任务的优先级。优先级仅在任务出队后相对于系统其他负载生效,不影响同一队列内任务的串行顺序。若未设置任务优先级,则默认继承队列的优先级。
13
14## 示例:异步日志系统
15
16举例实现一个异步日志系统,主线程将日志任务提交到队列,后台线程从队列中取出任务并写入文件。这种方式既能保证日志的顺序性,又能避免文件写入操作阻塞主线程。
17
18借助FFRT并行化框架API,开发者只需专注于业务逻辑的实现,无需关注异步线程管理、线程安全及调度效率等问题。
19
20用例简化了异常处理和线程安全相关的一些逻辑,实现代码如下所示:
21
22```c
23#include <stdio.h>
24#include <stdlib.h>
25#include <string.h>
26#include "ffrt.h"
27
28typedef struct {
29    FILE *logFile;          // 日志文件指针
30    ffrt_queue_t queue;     // 任务队列
31} logger_t;
32
33// 全局Logger变量
34logger_t* g_logger = NULL;
35
36// 初始化日志系统
37logger_t *logger_create(const char *filename)
38{
39    logger_t *logger = (logger_t *)malloc(sizeof(logger_t));
40    if (!logger) {
41        perror("Failed to allocate memory for logger_t");
42        return NULL;
43    }
44
45    // 打开日志文件
46    logger->logFile = fopen(filename, "a");
47    if (!logger->logFile) {
48        perror("Failed to open log file");
49        free(logger);
50        return NULL;
51    }
52    printf("Log file opened: %s\n", filename);
53
54    // 创建任务队列
55    logger->queue = ffrt_queue_create(ffrt_queue_serial, "logger_queue_c", NULL);
56    if (!logger->queue) {
57        perror("Failed to create queue");
58        fclose(logger->logFile);
59        free(logger);
60        return NULL;
61    }
62
63    return logger;
64}
65
66// 销毁日志系统
67void logger_destroy(logger_t *logger)
68{
69    if (logger) {
70        // 销毁队列
71        if (logger->queue) {
72            ffrt_queue_destroy(logger->queue);
73        }
74
75        // 关闭日志文件
76        if (logger->logFile) {
77            fclose(logger->logFile);
78            printf("Log file closed.\n");
79        }
80
81        free(logger);
82    }
83}
84
85// 日志任务
86void write_task(void *arg)
87{
88    char *message = (char *)arg;
89    if (g_logger && g_logger->logFile) {
90        fprintf(g_logger->logFile, "%s\n", message);
91        fflush(g_logger->logFile);
92    }
93
94    free(message);
95}
96
97// 添加日志任务
98void logger_log(logger_t *logger, const char *message)
99{
100    if (!logger || !logger->queue) {
101        return;
102    }
103
104    // 复制消息字符串
105    char *messageCopy = strdup(message);
106    if (!messageCopy) {
107        perror("Failed to allocate memory for message");
108        return;
109    }
110
111    ffrt_queue_submit(logger->queue, ffrt_create_function_wrapper(write_task, NULL, messageCopy), NULL);
112}
113
114int main()
115{
116    // 初始化全局logger
117    g_logger = logger_create("log_c.txt");
118    if (!g_logger) {
119        return -1;
120    }
121
122    // 使用全局logger添加日志任务
123    logger_log(g_logger, "Log message 1");
124    logger_log(g_logger, "Log message 2");
125    logger_log(g_logger, "Log message 3");
126
127    // 模拟主线程继续执行其他任务
128    sleep(1);
129
130    // 销毁全局logger
131    logger_destroy(g_logger);
132    g_logger = NULL;
133    return 0;
134}
135```
136
137C风格构建FFRT任务需要一些额外的封装,封装方式为公共代码,与具体业务场景无关,使用方可以考虑用公共机制封装管理。
138
139```c
140typedef struct {
141    ffrt_function_header_t header;
142    ffrt_function_t func;
143    ffrt_function_t after_func;
144    void* arg;
145} c_function_t;
146
147static inline void ffrt_exec_function_wrapper(void* t)
148{
149    c_function_t* f = (c_function_t *)t;
150    if (f->func) {
151        f->func(f->arg);
152    }
153}
154
155static inline void ffrt_destroy_function_wrapper(void* t)
156{
157    c_function_t* f = (c_function_t *)t;
158    if (f->after_func) {
159        f->after_func(f->arg);
160    }
161}
162
163#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1])
164static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func,
165    const ffrt_function_t after_func, void *arg)
166{
167    FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size,
168        size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size);
169
170    c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_queue);
171    f->header.exec = ffrt_exec_function_wrapper;
172    f->header.destroy = ffrt_destroy_function_wrapper;
173    f->func = func;
174    f->after_func = after_func;
175    f->arg = arg;
176    return (ffrt_function_header_t *)f;
177}
178```
179
180## 接口说明
181
182上述样例中涉及到主要的FFRT的接口包括:
183
184| 名称                                                             | 描述                           |
185| ---------------------------------------------------------------- | ------------------------------ |
186| [ffrt_queue_create](ffrt-api-guideline-c.md#ffrt_queue_create)   | 创建队列。                     |
187| [ffrt_queue_destroy](ffrt-api-guideline-c.md#ffrt_queue_destroy) | 销毁队列。                     |
188| [ffrt_queue_submit](ffrt-api-guideline-c.md#ffrt_queue_submit)   | 提交一个任务到队列中调度执行。 |
189
190## 约束限制
191
192- **避免提交超长任务** FFRT内置进程级队列任务超时检测机制。当串行任务执行时间超过预设阈值(默认30秒)时,系统将打印和上报异常日志,并触发预设的进程超时回调函数(如已配置)。
193- **同步原语使用规范** 在提交给FFRT的任务闭包中,避免使用`std::mutex`、`std::condition_variable`和`std::recursive_mutex`,标准库同步原语会长时间占用FFRT Worker线程。请替换为FFRT提供的同步原语:`ffrt::mutex`、`ffrt::condition_variable`或`ffrt::recursive_mutex`,其用法与标准库相同。
194- **全局变量中的队列管理** 若在全局变量中管理串行队列,随业务进程销毁,测试程序中需注意生命周期解耦问题。在测试用例结束时,需显式释放串行队列,其他资源可随全局变量释放。原因是全局变量在主函数结束后析构,而串行队列的释放依赖于FFRT框架中的其他资源,此时这些资源可能已被销毁。
195