• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <cstdlib>
16 #include <cstring>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "qos.h"
19 #include "queue.h"
20 
21 #ifdef  __cplusplus
22 extern "C" {
23 #endif
24 #ifdef FFRT_IO_TASK_SCHEDULER
25 /* 从队列的首部取出 */
queue_pophead(struct queue_s * queue)26 void *queue_pophead(struct queue_s *queue)
27 {
28     unsigned int head;
29     unsigned int tail;
30     void *res;
31 
32     while (1) {
33         head = atomic_load(&queue->head);
34         tail = atomic_load(&queue->tail);
35         if (tail == head) {
36             return nullptr;
37         }
38         res = queue->buf[head % queue->capacity];
39         if (atomic_compare_exchange_weak(&queue->head, &head, head + 1)) {
40             return res;
41         }
42     }
43 }
44 
45 /*
46  * 加入到队列的尾部
47  * 返回值0表示OK 返回值ERROR_QUEUE_FULL表示队列已满插入失败
48  */
queue_pushtail(struct queue_s * queue,void * object)49 int queue_pushtail(struct queue_s *queue, void *object)
50 {
51     unsigned int head;
52     unsigned int tail;
53 
54     head = atomic_load(&queue->head);
55     tail = atomic_load(&queue->tail);
56     if ((tail - head) < queue->capacity) {
57         queue->buf[tail % queue->capacity] = object;
58         atomic_store(&queue->tail, tail + 1);
59         return 0;
60     }
61     return ERROR_QUEUE_FULL;
62 }
63 
64 /*
65  * 队列初始化
66  * 头尾指针初始化为0,容量为传入的容量
67  */
queue_init(struct queue_s * queue,unsigned int capacity)68 int queue_init(struct queue_s *queue, unsigned int capacity)
69 {
70     if (capacity == 0) {
71         return ERROR_QUEUE_ARG_INVALID;
72     }
73     queue->buf = (void **)malloc(sizeof(void *) * (capacity));
74     if (queue->buf == nullptr) {
75         FFRT_LOGE("queue malloc failed, size: %u", (sizeof(void *) * capacity));
76         return ERROR_QUEUE_BUF_ALLOC_FAILED;
77     }
78     queue->capacity = capacity;
79     atomic_store(&queue->head, (unsigned int)0);
80     atomic_store(&queue->tail, (unsigned int)0);
81     return 0;
82 }
83 
queue_destroy(struct queue_s * queue)84 void queue_destroy(struct queue_s *queue)
85 {
86     if (queue->buf != nullptr) {
87         free(queue->buf);
88         queue->buf = nullptr;
89     }
90     queue->capacity = 0;
91     atomic_store(&queue->head, (unsigned int)0);
92     atomic_store(&queue->tail, (unsigned int)0);
93 }
94 
95 /* 获取队列的长度 */
queue_length(struct queue_s * queue)96 unsigned int queue_length(struct queue_s *queue)
97 {
98     unsigned int head;
99     unsigned int tail;
100 
101     head = atomic_load(&queue->head);
102     tail = atomic_load(&queue->tail);
103     return (tail - head);
104 }
105 
106 /* 批量尾部插入 */
queue_pushtail_batch(struct queue_s * queue,void * buf[],unsigned int buf_len)107 unsigned int queue_pushtail_batch(struct queue_s *queue, void *buf[], unsigned int buf_len)
108 {
109     unsigned int head;
110     unsigned int tail;
111     unsigned int i;
112 
113     if (buf_len == 0) {
114         return 0;
115     }
116 
117     head = atomic_load(&queue->head);
118     tail = atomic_load(&queue->tail);
119     i = 0;
120     while ((tail - head) < queue->capacity) {
121         queue->buf[tail % queue->capacity] = buf[i];
122         tail++;
123         i++;
124         if (i == buf_len) {
125             break;
126         }
127     }
128     atomic_store(&queue->tail, tail);
129     return i;
130 }
131 
132 /* 从首部批量获取 */
queue_pophead_batch(struct queue_s * queue,void * buf[],unsigned int buf_len)133 unsigned int queue_pophead_batch(struct queue_s *queue, void *buf[], unsigned int buf_len)
134 {
135     unsigned int head;
136     unsigned int tail;
137     unsigned int pop_len;
138     unsigned int i;
139 
140     if (buf_len == 0) {
141         return 0;
142     }
143 
144     while (1) {
145         head = atomic_load(&queue->head);
146         tail = atomic_load(&queue->tail);
147         if (head == tail) {
148             return 0;
149         }
150         pop_len = ((tail - head) > buf_len) ? buf_len : (tail - head);
151         for (i = 0; i < pop_len; i++) {
152             buf[i] = queue->buf[(head + i) % queue->capacity];
153         }
154         if (atomic_compare_exchange_weak(&queue->head, &head, head + pop_len)) {
155             return pop_len;
156         }
157     }
158 }
159 
160 /* 首部批量取出后将批量推入队列 */
queue_pophead_pushtail_batch(struct queue_s * target_queue,struct queue_s * local_queue,unsigned int pop_len)161 unsigned int queue_pophead_pushtail_batch(struct queue_s *target_queue, struct queue_s *local_queue,
162     unsigned int pop_len)
163 {
164     if (pop_len == 0) {
165         return 0;
166     }
167     unsigned int target_head;
168     unsigned int target_tail;
169     unsigned int local_head;
170     unsigned int local_tail;
171     unsigned int i;
172     target_head = atomic_load(&target_queue->head);
173     target_tail = atomic_load(&target_queue->tail);
174     local_head = atomic_load(&local_queue->head);
175     local_tail = atomic_load(&local_queue->tail);
176     i = 0;
177     while (((local_tail - local_head) < local_queue->capacity) && (target_tail != target_head)) {
178         auto temp = queue_pophead(target_queue);
179         if (temp == nullptr) {
180             break;
181         }
182         local_queue->buf[local_tail % local_queue->capacity] = temp;
183         local_tail++;
184         i++;
185         if (i == pop_len) {
186             break;
187         }
188     }
189     atomic_store(&local_queue->tail, local_tail);
190     return i;
191 }
192 
193 /* 本地队列首部批量取出一半元素后将元素批量推入全局队列尾部 */
queue_pophead_to_gqueue_batch(struct queue_s * queue,unsigned int pop_len,int qos,queue_push_task_func_t func)194 void queue_pophead_to_gqueue_batch(struct queue_s* queue, unsigned int pop_len, int qos, queue_push_task_func_t func)
195 {
196     if (pop_len == 0) {
197         return;
198     }
199     unsigned int head;
200     unsigned int tail;
201     unsigned int i;
202 
203     head = atomic_load(&queue->head);
204     tail = atomic_load(&queue->tail);
205     i = 0;
206     while ((tail != head) && i <= pop_len) {
207         auto tmp = queue_pophead(queue);
208         if (!func(tmp, qos)) {
209             FFRT_LOGE("Submit IO task failed!");
210             return;
211         }
212         i++;
213     }
214     return;
215 }
216 
queue_capacity(struct queue_s * queue)217 unsigned int queue_capacity(struct queue_s *queue)
218 {
219     return queue->capacity;
220 }
221 
queue_prob(struct queue_s * queue)222 unsigned int queue_prob(struct queue_s *queue)
223 {
224     return queue_length(queue); // / (queue_capacity(queue) / 100);
225 }
226 #endif
227 #ifdef  __cplusplus
228 }
229 #endif
230 
231