• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Function Flow Runtime Concurrent Queue (C)
2
3## Overview
4
5The FFRT concurrent queue provides the capability of setting the priority and queue concurrency. Tasks in the queue can be executed on multiple threads at the same time, achieving better effect.
6
7- **Queue concurrency**: You can set the maximum concurrency of a queue to control the number of tasks that can be executed at the same time. This avoids system resource impact caused by excessive concurrent tasks, ensuring system stability and performance.
8- **Task priority**: You can set a priority for each task. Different tasks are scheduled and executed strictly based on the priority. Tasks with the same priority are executed in sequence. Tasks with higher priorities are executed prior to those with lower priorities to ensure that key tasks can be processed in a timely manner.
9
10## Example: Bank Service System
11
12For example, each customer (common customer or VIP customer) submits a service request to the bank service system. The service request of the VIP customer can be processed first.
13The bank system has two windows for handling service requests submitted by customers. You can use the FFRT paradigm to perform the following modeling:
14
15- **Queuing logic**: concurrent queue.
16- **Service window**: concurrency of the concurrent queue, which also equals the number of FFRT Worker threads.
17- **Customer level**: priority of concurrent queue tasks.
18
19The implementation code is as follows:
20
21```c
22#include <stdio.h>
23#include <unistd.h>
24#include "ffrt/queue.h"
25#include "ffrt/task.h"
26
27ffrt_queue_t create_bank_system(const char *name, int concurrency)
28{
29    ffrt_queue_attr_t queue_attr;
30    (void)ffrt_queue_attr_init(&queue_attr);
31    ffrt_queue_attr_set_max_concurrency(&queue_attr, concurrency);
32
33    // Create a concurrent queue.
34    ffrt_queue_t queue = ffrt_queue_create(ffrt_queue_concurrent, name, &queue_attr);
35
36    // Destroy the queue attributes after the queue is created.
37    ffrt_queue_attr_destroy(&queue_attr);
38    if (!queue) {
39        printf("create queue failed\n");
40        return NULL;
41    }
42
43    printf("create bank system successfully\n");
44    return queue;
45}
46
47void destroy_bank_system(ffrt_queue_t queue_handle)
48{
49    ffrt_queue_destroy(queue_handle);
50    printf("destroy bank system successfully\n");
51}
52
53void bank_business(void *arg)
54{
55    usleep(100 * 1000);
56    const char *data = (const char *)arg;
57    printf("saving or withdraw for %s\n", data);
58}
59
60// Encapsulate the operation of submitting a task to a queue into a function.
61ffrt_task_handle_t commit_request(ffrt_queue_t bank, void (*func)(void *), const char *name,
62    ffrt_queue_priority_t level, int delay)
63{
64    ffrt_task_attr_t task_attr;
65    (void)ffrt_task_attr_init(&task_attr);
66    ffrt_task_attr_set_name(&task_attr, name);
67    ffrt_task_attr_set_queue_priority(&task_attr, level);
68    ffrt_task_attr_set_delay(&task_attr, delay);
69
70    return ffrt_queue_submit_h(bank, ffrt_create_function_wrapper(func, NULL, name), &task_attr);
71}
72
73// Encapsulate the operation of canceling a task in a queue into a function.
74int cancel_request(ffrt_task_handle_t request)
75{
76    return ffrt_queue_cancel(request);
77}
78
79// Encapsulate the operation of waiting for a task in a queue into a function.
80void wait_for_request(ffrt_task_handle_t task)
81{
82    ffrt_queue_wait(task);
83}
84
85int main()
86{
87    ffrt_queue_t bank = create_bank_system("Bank", 2);
88    if (!bank) {
89        printf("create bank system failed\n");
90        return -1;
91    }
92    commit_request(bank, bank_business, "customer1", ffrt_queue_priority_low, 0);
93    commit_request(bank, bank_business, "customer2", ffrt_queue_priority_low, 0);
94    commit_request(bank, bank_business, "customer3", ffrt_queue_priority_low, 0);
95    commit_request(bank, bank_business, "customer4", ffrt_queue_priority_low, 0);
96
97    // VIP customers have the priority to enjoy services.
98    commit_request(bank, bank_business, "VIP", ffrt_queue_priority_high, 0);
99
100    ffrt_task_handle_t task = commit_request(bank, bank_business, "customer5", ffrt_queue_priority_low, 0);
101    ffrt_task_handle_t task_last = commit_request(bank, bank_business, "customer6", ffrt_queue_priority_low, 0);
102
103    // Cancel the service for customer 5.
104    cancel_request(task);
105
106    // Wait until all customer services are complete.
107    wait_for_request(task_last);
108    destroy_bank_system(bank);
109
110    return 0;
111}
112```
113
114C-style FFRT construction requires additional encapsulation using common code and is irrelevant to specific service scenarios.
115
116```c
117typedef struct {
118    ffrt_function_header_t header;
119    ffrt_function_t func;
120    ffrt_function_t after_func;
121    void* arg;
122} c_function_t;
123
124static inline void ffrt_exec_function_wrapper(void* t)
125{
126    c_function_t* f = (c_function_t *)t;
127    if (f->func) {
128        f->func(f->arg);
129    }
130}
131
132static inline void ffrt_destroy_function_wrapper(void* t)
133{
134    c_function_t* f = (c_function_t *)t;
135    if (f->after_func) {
136        f->after_func(f->arg);
137    }
138}
139
140#define FFRT_STATIC_ASSERT(cond, msg) int x(int static_assertion_##msg[(cond) ? 1 : -1])
141static inline ffrt_function_header_t *ffrt_create_function_wrapper(const ffrt_function_t func,
142    const ffrt_function_t after_func, void *arg)
143{
144    FFRT_STATIC_ASSERT(sizeof(c_function_t) <= ffrt_auto_managed_function_storage_size,
145        size_of_function_must_be_less_than_ffrt_auto_managed_function_storage_size);
146
147    c_function_t* f = (c_function_t *)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_queue);
148    f->header.exec = ffrt_exec_function_wrapper;
149    f->header.destroy = ffrt_destroy_function_wrapper;
150    f->func = func;
151    f->after_func = after_func;
152    f->arg = arg;
153    return (ffrt_function_header_t *)f;
154}
155```
156
157## Available APIs
158
159The main FFRT APIs involved in the preceding example are as follows:
160
161| Name                                                                                              | Description                  |
162| -------------------------------------------------------------------------------------------------- | ---------------------- |
163| [ffrt_queue_create](ffrt-api-guideline-c.md#ffrt_queue_create)                                     | Creates a queue.            |
164| [ffrt_queue_destroy](ffrt-api-guideline-c.md#ffrt_queue_destroy)                                   | Destroys a queue.            |
165| [ffrt_task_attr_set_queue_priority](ffrt-api-guideline-c.md#ffrt_task_attr_set_queue_priority)     | Sets the priority of a task in a queue.  |
166| [ffrt_queue_attr_set_max_concurrency](ffrt-api-guideline-c.md#ffrt_queue_attr_set_max_concurrency) | Sets the concurrency of the concurrent queue.|
167
168## Constraints
169
1701. `ffrt_queue_attr_t` must be initialized by calling `ffrt_queue_attr_init` before the attribute is set or obtained. If `ffrt_queue_attr_t` is no longer used, `ffrt_queue_attr_destroy` needs to be explicitly called to release resources.
1712. `ffrt_queue_t` must explicitly call `ffrt_queue_destroy` to release resources before the process exits.
1723. It is recommended that the maximum concurrency of a concurrent queue be within a proper range. If the value is too large, it is meaningless to exceed the number of Worker threads. If the value is too small, the system resource utilization may be low.
173