1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef SOFTBUS_QUEUE_H
17 #define SOFTBUS_QUEUE_H
18
19 #include <pthread.h>
20 #include <stdbool.h>
21 #include <stdint.h>
22 #include "securec.h"
23
24 #include "softbus_adapter_cpu.h"
25 #include "softbus_adapter_atomic.h"
26
27 #ifdef __cplusplus
28 #if __cplusplus
29 extern "C" {
30 #endif
31 #endif
32
33 typedef enum {
34 QUEUE_INVAL = -10,
35 QUEUE_FULL,
36 QUEUE_EMPTY
37 } QueueError;
38
39 /** @brief Lock free ring queue */
40 typedef struct {
41 uint32_t magic;
42 uint32_t unitNum; /* queue node limit */
43 uint8_t pad[56]; /* cache line pad */
44
45 /* producer status */
46 struct {
47 uint32_t size; /* queue size */
48 uint32_t mask; /* mask(size-1). */
49 volatile uint32_t head; /* producer head */
50 volatile uint32_t tail; /* producer tail */
51 uint8_t pad[48]; /* cache line pad */
52 } producer;
53
54 /* consumer status */
55 struct {
56 uint32_t size; /* queue size */
57 uint32_t mask; /* mask(size-1). */
58 volatile uint32_t head; /* consumer head */
59 volatile uint32_t tail; /* consumer tail */
60 uint8_t pad[48]; /* cache line pad */
61 } consumer;
62
63 uintptr_t nodes[0]; /* queue nodes */
64 } LockFreeQueue;
65
66 extern int32_t QueueSizeCalc(uint32_t unitNum, uint32_t* queueSize);
67
68 extern int32_t QueueCountGet(const LockFreeQueue* queue, uint32_t* count);
69
70 extern int32_t QueueInit(LockFreeQueue* queue, uint32_t unitNum);
71
72 extern LockFreeQueue* CreateQueue(uint32_t unitNum);
73
QueueIsEmpty(LockFreeQueue * queue)74 static inline int32_t QueueIsEmpty(LockFreeQueue* queue)
75 {
76 uint32_t producerTail;
77 uint32_t consumerTail;
78 uint32_t mask;
79
80 if (queue == NULL) {
81 return QUEUE_INVAL;
82 }
83
84 producerTail = queue->producer.tail;
85 consumerTail = queue->consumer.tail;
86 mask = queue->producer.mask;
87
88 if (((producerTail - consumerTail) & mask) == 0) {
89 return 0;
90 }
91 return -1;
92 }
93
94 /** @brief Enqueue operation, thread unsafe */
QueueSingleProducerEnqueue(LockFreeQueue * queue,const void * node)95 static inline int32_t QueueSingleProducerEnqueue(LockFreeQueue *queue, const void *node)
96 {
97 uint32_t producerHead;
98 uint32_t producerNext;
99 uint32_t consumerTail;
100 uint32_t availableCount;
101 uint32_t mask;
102
103 if (queue == NULL || node == NULL) {
104 return QUEUE_INVAL;
105 }
106 mask = queue->producer.mask;
107
108 producerHead = queue->producer.head;
109 RMB();
110 consumerTail = queue->consumer.tail;
111
112 /*
113 * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
114 * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
115 * producerHead < consumerTail + mask - 0xFFFFFFFF
116 * The subtraction of two 32-bit integers results in 32-bit modulo.
117 * Therefore, the availableCount must be between 0 and the queue length.
118 */
119 availableCount = (mask + consumerTail) - producerHead;
120
121 if (availableCount < 1) {
122 return QUEUE_FULL;
123 }
124
125 producerNext = producerHead + 1;
126 queue->producer.head = producerNext;
127
128 queue->nodes[producerHead & mask] = (uintptr_t)node;
129
130 /*
131 * Make sure that the queue is filled with elements before updating the producer tail.
132 * Prevents problems when the producer tail is updated first:
133 * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
134 * but the consumer actually reads dirty elements.
135 * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
136 */
137 WMB();
138
139 queue->producer.tail = producerNext;
140 return 0;
141 }
142
143 /** @brief Enqueue operation, thread safe */
QueueMultiProducerEnqueue(LockFreeQueue * queue,const void * node)144 static inline int32_t QueueMultiProducerEnqueue(LockFreeQueue* queue, const void* node)
145 {
146 uint32_t producerHead;
147 uint32_t producerNext;
148 uint32_t consumerTail;
149 uint32_t availableCount;
150 bool success = false;
151 uint32_t mask;
152
153 if (queue == NULL || node == NULL) {
154 return QUEUE_INVAL;
155 }
156
157 mask = queue->producer.mask;
158 do {
159 producerHead = queue->producer.head;
160 /*
161 * Make sure the producer's head is read before the consumer's tail.
162 * If the consumer tail is read first, then the consumer consumes the queue,and then other producers
163 * produce the queue, the producer header may cross the consumer tail reversely.
164 */
165 RMB();
166 consumerTail = queue->consumer.tail;
167
168 /*
169 * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
170 * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
171 * producerHead < consumerTail + mask - 0xFFFFFFFF
172 * The subtraction of two 32-bit integers results in 32-bit modulo.
173 * Therefore, the availableCount must be between 0 and the queue length.
174 */
175
176 availableCount = (mask + consumerTail) - producerHead;
177
178 if (availableCount < 1) {
179 return QUEUE_FULL;
180 }
181
182 producerNext = producerHead + 1;
183 success = SoftBusAtomicCmpAndSwap32(&queue->producer.head, producerHead, producerNext);
184 } while (success == false);
185
186 queue->nodes[producerHead & mask] = (uintptr_t)node;
187
188 /*
189 * Make sure that the queue is filled with elements before updating the producer tail.
190 * Prevents problems when the producer tail is updated first:
191 * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
192 * but the consumer actually reads dirty elements.
193 * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
194 */
195
196 WMB();
197
198 /* Waiting for other producers to complete enqueuing. */
199 while (queue->producer.tail != producerHead) {
200 sched_yield();
201 }
202
203 queue->producer.tail += 1;
204 return 0;
205 }
206
207 /** @brief Dequeue operation, thread unsafe */
QueueSingleConsumerDequeue(LockFreeQueue * queue,void ** node)208 static inline int32_t QueueSingleConsumerDequeue(LockFreeQueue* queue, void** node)
209 {
210 uint32_t consumerHead;
211 uint32_t producerTail;
212 uint32_t consumerNext;
213 uint32_t availableCount;
214 uint32_t mask;
215
216 if (queue == NULL || node == NULL) {
217 return QUEUE_INVAL;
218 }
219 mask = queue->producer.mask;
220
221 consumerHead = queue->consumer.head;
222
223 /* Prevent producerTail from being read before consumerHead, causing queue head and tail reversal. */
224 RMB();
225
226 producerTail = queue->producer.tail;
227
228 /*
229 * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
230 * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
231 * producerTail < consumerHead + mask - 0xFFFFFFFF
232 * The subtraction of two 32-bit integers results in 32-bit modulo.
233 * Therefore, the availableCount must be between 0 and the queue length.
234 */
235 availableCount = (producerTail - consumerHead);
236
237 if (availableCount < 1) {
238 return QUEUE_EMPTY;
239 }
240
241 consumerNext = consumerHead + 1;
242 queue->consumer.head = consumerNext;
243
244 /* Prevent the read of queue->nodes before the read of ProdTail. */
245 RMB();
246
247 *node = (void *)(queue->nodes[consumerHead & mask]);
248
249 /*
250 * Ensure that the queue element is dequeued before updating the consumer's tail.
251 * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
252 * and can fill in new elements, which actually overwrites the elements that are not dequeued.
253 */
254 RMB();
255
256 queue->consumer.tail = consumerNext;
257 return 0;
258 }
259
260 /** @brief Dequeue operation, thread safe */
QueueMultiConsumerDequeue(LockFreeQueue * queue,void ** node)261 static inline int32_t QueueMultiConsumerDequeue(LockFreeQueue *queue, void **node)
262 {
263 bool success = false;
264 uint32_t consumerHead;
265 uint32_t producerTail;
266 uint32_t consumerNext;
267 uint32_t availableCount;
268 uint32_t mask;
269
270 if (queue == NULL || node == NULL) {
271 return QUEUE_INVAL;
272 }
273 mask = queue->producer.mask;
274
275 do {
276 consumerHead = queue->consumer.head;
277
278 /*
279 * Make sure the consumer's head is read before the producer's tail.
280 * If the producer tail is read first, then other consumers consume the queue,
281 * and finally the generator produces the queue, the consumer head may cross the producer tail.
282 */
283 RMB();
284
285 producerTail = queue->producer.tail;
286
287 /*
288 * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
289 * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
290 * producerTail < consumerHead + mask - 0xFFFFFFFF
291 * The subtraction of two 32-bit integers results in 32-bit modulo.
292 * Therefore, the availableCount must be between 0 and the queue length.
293 */
294
295 availableCount = (producerTail - consumerHead);
296
297 if (availableCount < 1) {
298 return QUEUE_EMPTY;
299 }
300
301 consumerNext = consumerHead + 1;
302 success = SoftBusAtomicCmpAndSwap32(&queue->consumer.head, consumerHead, consumerNext);
303 } while (success == false);
304
305 /* Prevent the read of queue->nodes before the read of ProdTail. */
306 RMB();
307
308 *node = (void *)(queue->nodes[consumerHead & mask]);
309
310 /*
311 * Ensure that the queue element is dequeued before updating the consumer's tail.
312 * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
313 * and can fill in new elements, which actually overwrites the elements that are not dequeued.
314 */
315 RMB();
316
317 /* Waiting for other consumers to finish dequeuing. */
318 while (queue->consumer.tail != consumerHead) {
319 sched_yield();
320 }
321
322 queue->consumer.tail += 1;
323
324 return 0;
325 }
326
327 #ifdef __cplusplus
328 #if __cplusplus
329 }
330 #endif /* __cplusplus */
331 #endif /* __cplusplus */
332 #endif // SOFTBUS_QUEUE_H
333