• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Function Flow Runtime Serial Queue (C)
2
3## Overview
4
5The FFRT serial queue is implemented based on the coroutine scheduling model. It provides efficient message queue functions and supports multiple service scenarios, such as asynchronous communication, mobile data peak clipping, lock-free status and resource management, and architecture decoupling. The following functions are supported:
6
7- **Queue creation and destruction**: The queue name and priority can be specified during creation. Each queue is equivalent to an independent thread. Tasks in the queue are executed asynchronously compared with user threads.
8- **Task delay**: The `delay` can be set when a task is submitted. The unit is `μs`. The delayed task will be scheduled and executed after `uptime` (submission time + delay time).
9- **Serial scheduling**: Tasks in the same queue are sorted in ascending order of `uptime` and executed in serial mode. Ensure that the next task starts to be executed only after the previous task in the queue is complete.
10- **Task canceling**: You can cancel a task that is not dequeued based on the task handle. The task cannot be canceled if it has been started or completed.
11- **Task waiting**: You can wait for a task to complete based on the task handle. When a specified task is complete, all tasks whose `uptime` is earlier than the specified task in the queue have been executed.
12- **Task priority**: You can set the priority of a single task when submitting the task. Priorities take effect only after a task is dequeued relative to other system loads, and do not affect the serial task order in the same queue. If the task priority is not set, the priority of the queue is inherited by default.
13
14## Example: Asynchronous Log System
15
16The following is an example of implementing an asynchronous log system. The main thread submits the log task to the queue, and the background thread obtains the task from the queue and writes the task to the file. It ensures the log sequence and prevents the main thread from being blocked by the file write operation.
17
18With FFRT APIs, you only need to focus on service logic implementation and do not need to pay attention to asynchronous thread management, thread security, and scheduling efficiency.
19
20The example simplifies the logic for handling exceptions and ensuring thread security. The code is as follows:
21
22```c
23#include <stdio.h>
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27#include "ffrt/queue.h"
28#include "ffrt/task.h"
29
30typedef struct {
31    FILE *logFile;          // Pointer to a log file.
32    ffrt_queue_t queue;     // Task queue.
33} logger_t;
34
35// Global logger variable.
36logger_t* g_logger = NULL;
37
38// Initialize the log system.
39logger_t *logger_create(const char *filename)
40{
41    logger_t *logger = (logger_t *)malloc(sizeof(logger_t));
42    if (!logger) {
43        perror("Failed to allocate memory for logger_t");
44        return NULL;
45    }
46
47    // Open the log file.
48    logger->logFile = fopen(filename, "a");
49    if (!logger->logFile) {
50        perror("Failed to open log file");
51        free(logger);
52        return NULL;
53    }
54    printf("Log file opened: %s\n", filename);
55
56    // Create a task queue.
57    logger->queue = ffrt_queue_create(ffrt_queue_serial, "logger_queue_c", NULL);
58    if (!logger->queue) {
59        perror("Failed to create queue");
60        fclose(logger->logFile);
61        free(logger);
62        return NULL;
63    }
64
65    return logger;
66}
67
68// Destroy the log system.
69void logger_destroy(logger_t *logger)
70{
71    if (logger) {
72        // Destroy the queue.
73        if (logger->queue) {
74            ffrt_queue_destroy(logger->queue);
75        }
76
77        // Close the log file.
78        if (logger->logFile) {
79            fclose(logger->logFile);
80            printf("Log file closed\n");
81        }
82
83        free(logger);
84    }
85}
86
87// Log task.
88void write_task(void *arg)
89{
90    char *message = (char *)arg;
91    if (g_logger && g_logger->logFile) {
92        fprintf(g_logger->logFile, "%s\n", message);
93        fflush(g_logger->logFile);
94    }
95
96    free(message);
97}
98
99// Add a log task.
100void logger_log(logger_t *logger, const char *message)
101{
102    if (!logger || !logger->queue) {
103        return;
104    }
105
106    // Copy the message string.
107    char *messageCopy = strdup(message);
108    if (!messageCopy) {
109        perror("Failed to allocate memory for message");
110        return;
111    }
112
113    ffrt_queue_submit(logger->queue, ffrt_create_function_wrapper(write_task, NULL, messageCopy), NULL);
114}
115
116int main()
117{
118    // Initialize the global logger.
119    g_logger = logger_create("log_c.txt");
120    if (!g_logger) {
121        return -1;
122    }
123
124    // Use the global logger to add a log task.
125    logger_log(g_logger, "Log message 1");
126    logger_log(g_logger, "Log message 2");
127    logger_log(g_logger, "Log message 3");
128
129    // Simulate the main thread to continue executing other tasks.
130    sleep(1);
131
132    // Destroy the global logger.
133    logger_destroy(g_logger);
134    g_logger = NULL;
135    return 0;
136}
137```
138
139C-style FFRT construction requires additional encapsulation using common code and is irrelevant to specific service scenarios.
140
141```c
142typedef struct {
143    ffrt_function_header_t header;
144    ffrt_function_t func;
145    ffrt_function_t after_func;
146    void* arg;
147} c_function_t;
148
149static inline void ffrt_exec_function_wrapper(void* t)
150{
151    c_function_t* f = (c_function_t *)t;
152    if (f->func) {
153        f->func(f->arg);
154    }
155}
156
157static inline void ffrt_destroy_function_wrapper(void* t)
158{
159    c_function_t* f = (c_function_t *)t;
160    if (f->after_func) {
161        f->after_func(f->arg);
162    }
163}
164
165#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1])
166static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func,
167    const ffrt_function_t after_func, void *arg)
168{
169    FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size,
170        size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size);
171
172    c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_queue);
173    f->header.exec = ffrt_exec_function_wrapper;
174    f->header.destroy = ffrt_destroy_function_wrapper;
175    f->func = func;
176    f->after_func = after_func;
177    f->arg = arg;
178    return (ffrt_function_header_t *)f;
179}
180```
181
182## Available APIs
183
184The main FFRT APIs involved in the preceding example are as follows:
185
186| Name                                                            | Description                          |
187| ---------------------------------------------------------------- | ------------------------------ |
188| [ffrt_queue_create](ffrt-api-guideline-c.md#ffrt_queue_create)   | Creates a queue.                    |
189| [ffrt_queue_destroy](ffrt-api-guideline-c.md#ffrt_queue_destroy) | Destroys a queue.                    |
190| [ffrt_queue_submit](ffrt-api-guideline-c.md#ffrt_queue_submit)   | Submits a task to a queue.|
191
192## Constraints
193
194- **Avoid submitting ultra-long tasks.** The FFRT has a built-in process-level queue task timeout detection mechanism. When the execution time of a serial task exceeds the preset threshold (30 seconds by default), the system prints and reports exception logs and triggers the preset process timeout callback function (if configured).
195- **Use synchronization primitives correctly.** Do not use `std::mutex`, `std::condition_variable`, or `std::recursive_mutex` in the task closure submitted to FFRT. As synchronization primitives in the standard library will occupy the FFRT Worker thread for a long time, you should use the synchronization primitives provided by FFRT: `ffrt::mutex`, `ffrt::condition_variable`, or `ffrt::recursive_mutex`. The usage is the same as that of the standard library.
196- **Manage queues in global variables.** If serial queues are managed in global variables and destroyed with service processes, pay attention to lifecycle decoupling in the test program. When the test is complete, the serial queue needs to be explicitly released. Other resources can be released with global variables. The reason is that global variables are destructed after the main function ends, and the release of serial queues depends on other resources in the FFRT framework, and the resources may have been destroyed.
197