1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <cstdlib>
16 #include <cstring>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "qos.h"
19 #include "queue.h"
20
21 #ifdef __cplusplus
22 extern "C" {
23 #endif
24 #ifdef FFRT_IO_TASK_SCHEDULER
25 /* 从队列的首部取出 */
queue_pophead(struct queue_s * queue)26 void *queue_pophead(struct queue_s *queue)
27 {
28 unsigned int head;
29 unsigned int tail;
30 void *res;
31
32 while (1) {
33 head = atomic_load(&queue->head);
34 tail = atomic_load(&queue->tail);
35 if (tail == head) {
36 return nullptr;
37 }
38 res = queue->buf[head % queue->capacity];
39 if (atomic_compare_exchange_weak(&queue->head, &head, head + 1)) {
40 return res;
41 }
42 }
43 }
44
45 /*
46 * 加入到队列的尾部
47 * 返回值0表示OK 返回值ERROR_QUEUE_FULL表示队列已满插入失败
48 */
queue_pushtail(struct queue_s * queue,void * object)49 int queue_pushtail(struct queue_s *queue, void *object)
50 {
51 unsigned int head;
52 unsigned int tail;
53
54 head = atomic_load(&queue->head);
55 tail = atomic_load(&queue->tail);
56 if ((tail - head) < queue->capacity) {
57 queue->buf[tail % queue->capacity] = object;
58 atomic_store(&queue->tail, tail + 1);
59 return 0;
60 }
61 return ERROR_QUEUE_FULL;
62 }
63
64 /*
65 * 队列初始化
66 * 头尾指针初始化为0,容量为传入的容量
67 */
queue_init(struct queue_s * queue,unsigned int capacity)68 int queue_init(struct queue_s *queue, unsigned int capacity)
69 {
70 if (capacity == 0) {
71 return ERROR_QUEUE_ARG_INVALID;
72 }
73 queue->buf = (void **)malloc(sizeof(void *) * (capacity));
74 if (queue->buf == nullptr) {
75 FFRT_LOGE("queue malloc failed, size: %u", (sizeof(void *) * capacity));
76 return ERROR_QUEUE_BUF_ALLOC_FAILED;
77 }
78 queue->capacity = capacity;
79 atomic_store(&queue->head, (unsigned int)0);
80 atomic_store(&queue->tail, (unsigned int)0);
81 return 0;
82 }
83
queue_destroy(struct queue_s * queue)84 void queue_destroy(struct queue_s *queue)
85 {
86 if (queue->buf != nullptr) {
87 free(queue->buf);
88 queue->buf = nullptr;
89 }
90 queue->capacity = 0;
91 atomic_store(&queue->head, (unsigned int)0);
92 atomic_store(&queue->tail, (unsigned int)0);
93 }
94
95 /* 获取队列的长度 */
queue_length(struct queue_s * queue)96 unsigned int queue_length(struct queue_s *queue)
97 {
98 unsigned int head;
99 unsigned int tail;
100
101 head = atomic_load(&queue->head);
102 tail = atomic_load(&queue->tail);
103 return (tail - head);
104 }
105
106 /* 批量尾部插入 */
queue_pushtail_batch(struct queue_s * queue,void * buf[],unsigned int buf_len)107 unsigned int queue_pushtail_batch(struct queue_s *queue, void *buf[], unsigned int buf_len)
108 {
109 unsigned int head;
110 unsigned int tail;
111 unsigned int i;
112
113 if (buf_len == 0) {
114 return 0;
115 }
116
117 head = atomic_load(&queue->head);
118 tail = atomic_load(&queue->tail);
119 i = 0;
120 while ((tail - head) < queue->capacity) {
121 queue->buf[tail % queue->capacity] = buf[i];
122 tail++;
123 i++;
124 if (i == buf_len) {
125 break;
126 }
127 }
128 atomic_store(&queue->tail, tail);
129 return i;
130 }
131
132 /* 从首部批量获取 */
queue_pophead_batch(struct queue_s * queue,void * buf[],unsigned int buf_len)133 unsigned int queue_pophead_batch(struct queue_s *queue, void *buf[], unsigned int buf_len)
134 {
135 unsigned int head;
136 unsigned int tail;
137 unsigned int pop_len;
138 unsigned int i;
139
140 if (buf_len == 0) {
141 return 0;
142 }
143
144 while (1) {
145 head = atomic_load(&queue->head);
146 tail = atomic_load(&queue->tail);
147 if (head == tail) {
148 return 0;
149 }
150 pop_len = ((tail - head) > buf_len) ? buf_len : (tail - head);
151 for (i = 0; i < pop_len; i++) {
152 buf[i] = queue->buf[(head + i) % queue->capacity];
153 }
154 if (atomic_compare_exchange_weak(&queue->head, &head, head + pop_len)) {
155 return pop_len;
156 }
157 }
158 }
159
160 /* 首部批量取出后将批量推入队列 */
queue_pophead_pushtail_batch(struct queue_s * target_queue,struct queue_s * local_queue,unsigned int pop_len)161 unsigned int queue_pophead_pushtail_batch(struct queue_s *target_queue, struct queue_s *local_queue,
162 unsigned int pop_len)
163 {
164 if (pop_len == 0) {
165 return 0;
166 }
167 unsigned int target_head;
168 unsigned int target_tail;
169 unsigned int local_head;
170 unsigned int local_tail;
171 unsigned int i;
172 target_head = atomic_load(&target_queue->head);
173 target_tail = atomic_load(&target_queue->tail);
174 local_head = atomic_load(&local_queue->head);
175 local_tail = atomic_load(&local_queue->tail);
176 i = 0;
177 while (((local_tail - local_head) < local_queue->capacity) && (target_tail != target_head)) {
178 auto temp = queue_pophead(target_queue);
179 if (temp == nullptr) {
180 break;
181 }
182 local_queue->buf[local_tail % local_queue->capacity] = temp;
183 local_tail++;
184 i++;
185 if (i == pop_len) {
186 break;
187 }
188 }
189 atomic_store(&local_queue->tail, local_tail);
190 return i;
191 }
192
193 /* 本地队列首部批量取出一半元素后将元素批量推入全局队列尾部 */
queue_pophead_to_gqueue_batch(struct queue_s * queue,unsigned int pop_len,int qos,queue_push_task_func_t func)194 void queue_pophead_to_gqueue_batch(struct queue_s* queue, unsigned int pop_len, int qos, queue_push_task_func_t func)
195 {
196 if (pop_len == 0) {
197 return;
198 }
199 unsigned int head;
200 unsigned int tail;
201 unsigned int i;
202
203 head = atomic_load(&queue->head);
204 tail = atomic_load(&queue->tail);
205 i = 0;
206 while ((tail != head) && i <= pop_len) {
207 auto tmp = queue_pophead(queue);
208 if (!func(tmp, qos)) {
209 FFRT_LOGE("Submit IO task failed!");
210 return;
211 }
212 i++;
213 }
214 return;
215 }
216
queue_capacity(struct queue_s * queue)217 unsigned int queue_capacity(struct queue_s *queue)
218 {
219 return queue->capacity;
220 }
221
queue_prob(struct queue_s * queue)222 unsigned int queue_prob(struct queue_s *queue)
223 {
224 return queue_length(queue); // / (queue_capacity(queue) / 100);
225 }
226 #endif
227 #ifdef __cplusplus
228 }
229 #endif
230
231