• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SOFTBUS_QUEUE_H
17 #define SOFTBUS_QUEUE_H
18 
19 #include <pthread.h>
20 #include <stdbool.h>
21 #include <stdint.h>
22 #include "securec.h"
23 
24 #include "softbus_adapter_cpu.h"
25 #include "softbus_adapter_atomic.h"
26 
27 #ifdef __cplusplus
28 #if __cplusplus
29 extern "C" {
30 #endif
31 #endif
32 
33 typedef enum {
34     QUEUE_INVAL = -10,
35     QUEUE_FULL,
36     QUEUE_EMPTY
37 } QueueError;
38 
39 /** @brief Lock free ring queue */
40 typedef struct {
41     uint32_t magic;
42     uint32_t unitNum;  /* queue node limit */
43     uint8_t pad[56];   /* cache line pad */
44 
45     /* producer status */
46     struct {
47         uint32_t size;              /* queue size */
48         uint32_t mask;              /* mask(size-1). */
49         volatile uint32_t head;     /* producer head */
50         volatile uint32_t tail;     /* producer tail */
51         uint8_t pad[48];            /* cache line pad */
52     } producer;
53 
54     /* consumer status */
55     struct {
56         uint32_t size;              /* queue size */
57         uint32_t mask;              /* mask(size-1). */
58         volatile uint32_t head;     /* consumer head */
59         volatile uint32_t tail;     /* consumer tail */
60         uint8_t pad[48];            /* cache line pad */
61     } consumer;
62 
63     uintptr_t nodes[0]; /* queue nodes */
64 } LockFreeQueue;
65 
66 extern int32_t QueueSizeCalc(uint32_t unitNum, uint32_t* queueSize);
67 
68 extern int32_t QueueCountGet(const LockFreeQueue* queue, uint32_t* count);
69 
70 extern int32_t QueueInit(LockFreeQueue* queue, uint32_t unitNum);
71 
72 extern LockFreeQueue* CreateQueue(uint32_t unitNum);
73 
QueueIsEmpty(LockFreeQueue * queue)74 static inline int32_t QueueIsEmpty(LockFreeQueue* queue)
75 {
76     uint32_t producerTail;
77     uint32_t consumerTail;
78     uint32_t mask;
79 
80     if (queue == NULL) {
81         return QUEUE_INVAL;
82     }
83 
84     producerTail = queue->producer.tail;
85     consumerTail = queue->consumer.tail;
86     mask = queue->producer.mask;
87 
88     if (((producerTail - consumerTail) & mask) == 0) {
89         return 0;
90     }
91     return -1;
92 }
93 
94 /** @brief Enqueue operation, thread unsafe */
QueueSingleProducerEnqueue(LockFreeQueue * queue,const void * node)95 static inline int32_t QueueSingleProducerEnqueue(LockFreeQueue *queue, const void *node)
96 {
97     uint32_t producerHead;
98     uint32_t producerNext;
99     uint32_t consumerTail;
100     uint32_t availableCount;
101     uint32_t mask;
102 
103     if (queue == NULL || node == NULL) {
104         return QUEUE_INVAL;
105     }
106     mask = queue->producer.mask;
107 
108     producerHead = queue->producer.head;
109     RMB();
110     consumerTail = queue->consumer.tail;
111 
112     /*
113      * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
114      * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
115      *    producerHead < consumerTail + mask - 0xFFFFFFFF
116      * The subtraction of two 32-bit integers results in 32-bit modulo.
117      * Therefore, the availableCount must be between 0 and the queue length.
118      */
119     availableCount = (mask + consumerTail) - producerHead;
120 
121     if (availableCount < 1) {
122         return QUEUE_FULL;
123     }
124 
125     producerNext = producerHead + 1;
126     queue->producer.head = producerNext;
127 
128     queue->nodes[producerHead & mask] = (uintptr_t)node;
129 
130     /*
131      * Make sure that the queue is filled with elements before updating the producer tail.
132      * Prevents problems when the producer tail is updated first:
133      * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
134      *    but the consumer actually reads dirty elements.
135      * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
136      */
137     WMB();
138 
139     queue->producer.tail = producerNext;
140     return 0;
141 }
142 
143 /** @brief Enqueue operation, thread safe */
QueueMultiProducerEnqueue(LockFreeQueue * queue,const void * node)144 static inline int32_t QueueMultiProducerEnqueue(LockFreeQueue* queue, const void* node)
145 {
146     uint32_t producerHead;
147     uint32_t producerNext;
148     uint32_t consumerTail;
149     uint32_t availableCount;
150     bool success = false;
151     uint32_t mask;
152 
153     if (queue == NULL || node == NULL) {
154         return QUEUE_INVAL;
155     }
156 
157     mask = queue->producer.mask;
158     do {
159         producerHead = queue->producer.head;
160         /*
161          * Make sure the producer's head is read before the consumer's tail.
162          * If the consumer tail is read first, then the consumer consumes the queue,and then other producers
163          * produce the queue, the producer header may cross the consumer tail reversely.
164          */
165         RMB();
166         consumerTail = queue->consumer.tail;
167 
168         /*
169          * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
170          * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
171          *    producerHead < consumerTail + mask - 0xFFFFFFFF
172          * The subtraction of two 32-bit integers results in 32-bit modulo.
173          * Therefore, the availableCount must be between 0 and the queue length.
174          */
175 
176         availableCount = (mask + consumerTail) - producerHead;
177 
178         if (availableCount < 1) {
179             return QUEUE_FULL;
180         }
181 
182         producerNext = producerHead + 1;
183         success = SoftBusAtomicCmpAndSwap32(&queue->producer.head, producerHead, producerNext);
184     } while (success == false);
185 
186     queue->nodes[producerHead & mask] = (uintptr_t)node;
187 
188     /*
189      * Make sure that the queue is filled with elements before updating the producer tail.
190      * Prevents problems when the producer tail is updated first:
191      * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
192      *    but the consumer actually reads dirty elements.
193      * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
194      */
195 
196     WMB();
197 
198     /* Waiting for other producers to complete enqueuing. */
199     while (queue->producer.tail != producerHead) {
200         sched_yield();
201     }
202 
203     queue->producer.tail += 1;
204     return 0;
205 }
206 
207 /** @brief Dequeue operation, thread unsafe */
QueueSingleConsumerDequeue(LockFreeQueue * queue,void ** node)208 static inline int32_t QueueSingleConsumerDequeue(LockFreeQueue* queue, void** node)
209 {
210     uint32_t consumerHead;
211     uint32_t producerTail;
212     uint32_t consumerNext;
213     uint32_t availableCount;
214     uint32_t mask;
215 
216     if (queue == NULL || node == NULL) {
217         return QUEUE_INVAL;
218     }
219     mask = queue->producer.mask;
220 
221     consumerHead = queue->consumer.head;
222 
223     /* Prevent producerTail from being read before consumerHead, causing queue head and tail reversal. */
224     RMB();
225 
226     producerTail = queue->producer.tail;
227 
228     /*
229      * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
230      * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
231      *    producerTail < consumerHead + mask - 0xFFFFFFFF
232      * The subtraction of two 32-bit integers results in 32-bit modulo.
233      * Therefore, the availableCount must be between 0 and the queue length.
234      */
235     availableCount = (producerTail - consumerHead);
236 
237     if (availableCount < 1) {
238         return QUEUE_EMPTY;
239     }
240 
241     consumerNext = consumerHead + 1;
242     queue->consumer.head = consumerNext;
243 
244     /* Prevent the read of queue->nodes before the read of ProdTail. */
245     RMB();
246 
247     *node = (void *)(queue->nodes[consumerHead & mask]);
248 
249     /*
250      * Ensure that the queue element is dequeued before updating the consumer's tail.
251      * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
252      * and can fill in new elements, which actually overwrites the elements that are not dequeued.
253      */
254     RMB();
255 
256     queue->consumer.tail = consumerNext;
257     return 0;
258 }
259 
260 /** @brief Dequeue operation, thread safe */
QueueMultiConsumerDequeue(LockFreeQueue * queue,void ** node)261 static inline int32_t QueueMultiConsumerDequeue(LockFreeQueue *queue, void **node)
262 {
263     bool success = false;
264     uint32_t consumerHead;
265     uint32_t producerTail;
266     uint32_t consumerNext;
267     uint32_t availableCount;
268     uint32_t mask;
269 
270     if (queue == NULL || node == NULL) {
271         return QUEUE_INVAL;
272     }
273     mask = queue->producer.mask;
274 
275     do {
276         consumerHead = queue->consumer.head;
277 
278         /*
279          * Make sure the consumer's head is read before the producer's tail.
280          * If the producer tail is read first, then other consumers consume the queue,
281          * and finally the generator produces the queue, the consumer head may cross the producer tail.
282          */
283         RMB();
284 
285         producerTail = queue->producer.tail;
286 
287         /*
288          * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
289          * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
290          *    producerTail < consumerHead + mask - 0xFFFFFFFF
291          * The subtraction of two 32-bit integers results in 32-bit modulo.
292          * Therefore, the availableCount must be between 0 and the queue length.
293          */
294 
295         availableCount = (producerTail - consumerHead);
296 
297         if (availableCount < 1) {
298             return QUEUE_EMPTY;
299         }
300 
301         consumerNext = consumerHead + 1;
302         success = SoftBusAtomicCmpAndSwap32(&queue->consumer.head, consumerHead, consumerNext);
303     } while (success == false);
304 
305     /* Prevent the read of queue->nodes before the read of ProdTail. */
306     RMB();
307 
308     *node = (void *)(queue->nodes[consumerHead & mask]);
309 
310     /*
311      * Ensure that the queue element is dequeued before updating the consumer's tail.
312      * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
313      * and can fill in new elements, which actually overwrites the elements that are not dequeued.
314      */
315     RMB();
316 
317     /* Waiting for other consumers to finish dequeuing. */
318     while (queue->consumer.tail != consumerHead) {
319         sched_yield();
320     }
321 
322     queue->consumer.tail += 1;
323 
324     return 0;
325 }
326 
327 #ifdef __cplusplus
328 #if __cplusplus
329 }
330 #endif /* __cplusplus */
331 #endif /* __cplusplus */
332 #endif // SOFTBUS_QUEUE_H
333