1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef SOFTBUS_QUEUE_H
17 #define SOFTBUS_QUEUE_H
18
19 #include "securec.h"
20
21 #include "softbus_adapter_cpu.h"
22 #include "softbus_adapter_atomic.h"
23
24 #ifdef __cplusplus
25 #if __cplusplus
26 extern "C" {
27 #endif
28 #endif
29
30 typedef enum {
31 QUEUE_INVAL = -10,
32 QUEUE_FULL,
33 QUEUE_EMPTY
34 } QueueError;
35
36 /** @brief Lock free ring queue */
37 typedef struct {
38 uint32_t magic;
39 uint32_t unitNum; /* queue node limit */
40 uint8_t pad[56]; /* cache line pad */
41
42 /* producer status */
43 struct {
44 uint32_t size; /* queue size */
45 uint32_t mask; /* mask(size-1). */
46 volatile uint32_t head; /* producer head */
47 volatile uint32_t tail; /* producer tail */
48 uint8_t pad[48]; /* cache line pad */
49 } producer;
50
51 /* consumer status */
52 struct {
53 uint32_t size; /* queue size */
54 uint32_t mask; /* mask(size-1). */
55 volatile uint32_t head; /* consumer head */
56 volatile uint32_t tail; /* consumer tail */
57 uint8_t pad[48]; /* cache line pad */
58 } consumer;
59
60 uintptr_t nodes[0]; /* queue nodes */
61 } LockFreeQueue;
62
63 extern int32_t QueueSizeCalc(uint32_t unitNum, uint32_t* queueSize);
64
65 extern int32_t QueueCountGet(const LockFreeQueue* queue, uint32_t* count);
66
67 extern int32_t QueueInit(LockFreeQueue* queue, uint32_t unitNum);
68
69 extern LockFreeQueue* CreateQueue(uint32_t unitNum);
70
QueueIsEmpty(LockFreeQueue * queue)71 static inline int32_t QueueIsEmpty(LockFreeQueue* queue)
72 {
73 uint32_t producerTail;
74 uint32_t consumerTail;
75 uint32_t mask;
76
77 if (queue == NULL) {
78 return QUEUE_INVAL;
79 }
80
81 producerTail = queue->producer.tail;
82 consumerTail = queue->consumer.tail;
83 mask = queue->producer.mask;
84
85 if (((producerTail - consumerTail) & mask) == 0) {
86 return 0;
87 }
88 return -1;
89 }
90
91 /** @brief Enqueue operation, thread unsafe */
QueueSingleProducerEnqueue(LockFreeQueue * queue,const void * node)92 static inline int32_t QueueSingleProducerEnqueue(LockFreeQueue *queue, const void *node)
93 {
94 uint32_t producerHead;
95 uint32_t producerNext;
96 uint32_t consumerTail;
97 uint32_t availableCount;
98 uint32_t mask;
99
100 if (queue == NULL || node == NULL) {
101 return QUEUE_INVAL;
102 }
103 mask = queue->producer.mask;
104
105 producerHead = queue->producer.head;
106 RMB();
107 consumerTail = queue->consumer.tail;
108
109 /*
110 * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
111 * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
112 * producerHead < consumerTail + mask - 0xFFFFFFFF
113 * The subtraction of two 32-bit integers results in 32-bit modulo.
114 * Therefore, the availableCount must be between 0 and the queue length.
115 */
116 availableCount = (mask + consumerTail) - producerHead;
117
118 if (availableCount < 1) {
119 return QUEUE_FULL;
120 }
121
122 producerNext = producerHead + 1;
123 queue->producer.head = producerNext;
124
125 queue->nodes[producerHead & mask] = (uintptr_t)node;
126
127 /*
128 * Make sure that the queue is filled with elements before updating the producer tail.
129 * Prevents problems when the producer tail is updated first:
130 * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
131 * but the consumer actually reads dirty elements.
132 * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
133 */
134 WMB();
135
136 queue->producer.tail = producerNext;
137 return 0;
138 }
139
140 /** @brief Enqueue operation, thread safe */
QueueMultiProducerEnqueue(LockFreeQueue * queue,const void * node)141 static inline int32_t QueueMultiProducerEnqueue(LockFreeQueue* queue, const void* node)
142 {
143 uint32_t producerHead;
144 uint32_t producerNext;
145 uint32_t consumerTail;
146 uint32_t availableCount;
147 bool success = false;
148 uint32_t mask;
149
150 if (queue == NULL || node == NULL) {
151 return QUEUE_INVAL;
152 }
153
154 mask = queue->producer.mask;
155 do {
156 producerHead = queue->producer.head;
157 /*
158 * Make sure the producer's head is read before the consumer's tail.
159 * If the consumer tail is read first, then the consumer consumes the queue,and then other producers
160 * produce the queue, the producer header may cross the consumer tail reversely.
161 */
162 RMB();
163 consumerTail = queue->consumer.tail;
164
165 /*
166 * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
167 * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
168 * producerHead < consumerTail + mask - 0xFFFFFFFF
169 * The subtraction of two 32-bit integers results in 32-bit modulo.
170 * Therefore, the availableCount must be between 0 and the queue length.
171 */
172
173 availableCount = (mask + consumerTail) - producerHead;
174
175 if (availableCount < 1) {
176 return QUEUE_FULL;
177 }
178
179 producerNext = producerHead + 1;
180 success = SoftBusAtomicCmpAndSwap32(&queue->producer.head, producerHead, producerNext);
181 } while (success == false);
182
183 queue->nodes[producerHead & mask] = (uintptr_t)node;
184
185 /*
186 * Make sure that the queue is filled with elements before updating the producer tail.
187 * Prevents problems when the producer tail is updated first:
188 * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
189 * but the consumer actually reads dirty elements.
190 * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
191 */
192
193 WMB();
194
195 /* Waiting for other producers to complete enqueuing. */
196 while (queue->producer.tail != producerHead) {
197 SoftBusYieldCpu();
198 }
199
200 queue->producer.tail += 1;
201 return 0;
202 }
203
204 /** @brief Dequeue operation, thread unsafe */
QueueSingleConsumerDequeue(LockFreeQueue * queue,void ** node)205 static inline int32_t QueueSingleConsumerDequeue(LockFreeQueue* queue, void** node)
206 {
207 uint32_t consumerHead;
208 uint32_t producerTail;
209 uint32_t consumerNext;
210 uint32_t availableCount;
211 uint32_t mask;
212
213 if (queue == NULL || node == NULL) {
214 return QUEUE_INVAL;
215 }
216 mask = queue->producer.mask;
217
218 consumerHead = queue->consumer.head;
219
220 /* Prevent producerTail from being read before consumerHead, causing queue head and tail reversal. */
221 RMB();
222
223 producerTail = queue->producer.tail;
224
225 /*
226 * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
227 * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
228 * producerTail < consumerHead + mask - 0xFFFFFFFF
229 * The subtraction of two 32-bit integers results in 32-bit modulo.
230 * Therefore, the availableCount must be between 0 and the queue length.
231 */
232 availableCount = (producerTail - consumerHead);
233
234 if (availableCount < 1) {
235 return QUEUE_EMPTY;
236 }
237
238 consumerNext = consumerHead + 1;
239 queue->consumer.head = consumerNext;
240
241 /* Prevent the read of queue->nodes before the read of ProdTail. */
242 RMB();
243
244 *node = (void *)(queue->nodes[consumerHead & mask]);
245
246 /*
247 * Ensure that the queue element is dequeued before updating the consumer's tail.
248 * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
249 * and can fill in new elements, which actually overwrites the elements that are not dequeued.
250 */
251 RMB();
252
253 queue->consumer.tail = consumerNext;
254 return 0;
255 }
256
257 /** @brief Dequeue operation, thread safe */
QueueMultiConsumerDequeue(LockFreeQueue * queue,void ** node)258 static inline int32_t QueueMultiConsumerDequeue(LockFreeQueue *queue, void **node)
259 {
260 bool success = false;
261 uint32_t consumerHead;
262 uint32_t producerTail;
263 uint32_t consumerNext;
264 uint32_t availableCount;
265 uint32_t mask;
266
267 if (queue == NULL || node == NULL) {
268 return QUEUE_INVAL;
269 }
270 mask = queue->producer.mask;
271
272 do {
273 consumerHead = queue->consumer.head;
274
275 /*
276 * Make sure the consumer's head is read before the producer's tail.
277 * If the producer tail is read first, then other consumers consume the queue,
278 * and finally the generator produces the queue, the consumer head may cross the producer tail.
279 */
280 RMB();
281
282 producerTail = queue->producer.tail;
283
284 /*
285 * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
286 * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
287 * producerTail < consumerHead + mask - 0xFFFFFFFF
288 * The subtraction of two 32-bit integers results in 32-bit modulo.
289 * Therefore, the availableCount must be between 0 and the queue length.
290 */
291
292 availableCount = (producerTail - consumerHead);
293
294 if (availableCount < 1) {
295 return QUEUE_EMPTY;
296 }
297
298 consumerNext = consumerHead + 1;
299 success = SoftBusAtomicCmpAndSwap32(&queue->consumer.head, consumerHead, consumerNext);
300 } while (success == false);
301
302 /* Prevent the read of queue->nodes before the read of ProdTail. */
303 RMB();
304
305 *node = (void *)(queue->nodes[consumerHead & mask]);
306
307 /*
308 * Ensure that the queue element is dequeued before updating the consumer's tail.
309 * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
310 * and can fill in new elements, which actually overwrites the elements that are not dequeued.
311 */
312 RMB();
313
314 /* Waiting for other consumers to finish dequeuing. */
315 while (queue->consumer.tail != consumerHead) {
316 SoftBusYieldCpu();
317 }
318
319 queue->consumer.tail += 1;
320
321 return 0;
322 }
323
324 #ifdef __cplusplus
325 #if __cplusplus
326 }
327 #endif /* __cplusplus */
328 #endif /* __cplusplus */
329 #endif // SOFTBUS_QUEUE_H
330