• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "log.h"
17 #include "fillp_function.h"
18 #include "lf_ring.h"
19 
20 #ifdef __cplusplus
21 extern "C" {
22 #endif
23 
24 
25 /*
26  * Function    : FillpLfRingCalMemSize
27  *
28  * Description : This function will be invoked to calculate the memory size
29  * required for lock-free queue, based on size.
30  *
31  * Input       : size - no of items of lock-free queue
32  *
33  * Output      : None
34  *
35  * Return      : Memory size of lock-free queue
36  */
FillpLfRingCalMemSize(size_t size)37 size_t FillpLfRingCalMemSize(size_t size)
38 {
39     size_t memSize = size * sizeof(void *);
40     if ((memSize == 0) || (memSize / sizeof(void *) != size)) {
41         return 0;
42     }
43 
44     memSize = memSize + sizeof(struct FillpLfRing);
45     if (memSize < sizeof(struct FillpLfRing)) {
46         return 0;
47     }
48     return memSize;
49 }
50 
51 /*
52  * Function    : FillpLfRingInit
53  *
54  * Description : This function will be invoked to init the lock-free queue.
55  *
56  * Input       : ring - lock-free queue to be initialized
57  * name - name to be added to the lock-free queue
58  * size - size of lock-free queue
59  *
60  * Output      : ring - initialized lock-free queue
61  *
62  * Return      : None
63  */
FillpLfRingInit(struct FillpLfRing * ring,char * name,size_t size)64 void FillpLfRingInit(struct FillpLfRing *ring, char *name, size_t size)
65 {
66     FILLP_UNUSED_PARA(name);
67     if (ring == FILLP_NULL_PTR) {
68         return;
69     }
70 
71     if (size == 0) {
72         return;
73     }
74 
75     ring->size = (FILLP_ULONG)size;
76 
77     ring->cons.head = 0;
78     ring->cons.tail = 0;
79 
80     ring->prod.head = 0;
81     ring->prod.tail = 0;
82     ring->consSafe = FILLP_TRUE;
83     ring->prodSafe = FILLP_TRUE;
84 
85     (void)memset_s(ring->name, sizeof(ring->name), '\0', sizeof(ring->name));
86     return;
87 }
88 
FillpLfRingSetProdSafe(struct FillpLfRing * ring,FILLP_BOOL safe)89 void FillpLfRingSetProdSafe(struct FillpLfRing *ring, FILLP_BOOL safe)
90 {
91     ring->prodSafe = safe;
92 }
93 
FillpLfRingSetConsSafe(struct FillpLfRing * ring,FILLP_BOOL safe)94 void FillpLfRingSetConsSafe(struct FillpLfRing *ring, FILLP_BOOL safe)
95 {
96     ring->consSafe = safe;
97 }
98 
FillpLfRingMpEnqueueWait(struct FillpLfRing * ring,FILLP_UINT count,FILLP_ULONG * prodHead,FILLP_ULONG * prodNext)99 static FILLP_ULONG FillpLfRingMpEnqueueWait(struct FillpLfRing *ring, FILLP_UINT count,
100     FILLP_ULONG *prodHead, FILLP_ULONG *prodNext)
101 {
102     FILLP_ULONG consTail;
103     FILLP_ULONG freeEntries;
104     FILLP_ULONG ret;
105     do {
106         *prodHead = ring->prod.head;
107         consTail = ring->cons.tail;
108 
109         sys_arch_compiler_barrier();
110         /* The subtraction is done between two unsigned 32bits value
111          * (the result is always modulo 32 bits even if we have
112          * prod_head > cons_tail). So 'free_entries' is always between 0
113          * and size(ring). */
114         freeEntries = (ring->size + consTail - *prodHead);
115 
116         /* check that we have enough room in ring */
117         if (freeEntries == 0) {
118             return 0;
119         }
120         ret = ((freeEntries > count) ? count : freeEntries);
121         *prodNext = *prodHead + ret;
122 
123         if (!ring->prodSafe) {
124             ring->prod.head = *prodNext;
125             break;
126         }
127     } while (unlikely(!CAS(&ring->prod.head, *prodHead, *prodNext)));
128 
129     return ret;
130 }
131 
FillpLfRingMpEnqueue(struct FillpLfRing * ring,void ** dataTable,FILLP_UINT count)132 FillpErrorType FillpLfRingMpEnqueue(struct FillpLfRing *ring, void **dataTable, FILLP_UINT count)
133 {
134     FILLP_ULONG prodHead = 0;
135     FILLP_ULONG prodNext = 0;
136     FILLP_ULONG i;
137     FILLP_ULONG j;
138     FILLP_ULONG rep = 0;
139     FILLP_ULONG ret;
140 
141     if ((ring == FILLP_NULL_PTR) || (dataTable == FILLP_NULL_PTR) || (count == 0)) {
142         return ERR_PARAM;
143     }
144     /* move prod.head atomically */
145     ret = FillpLfRingMpEnqueueWait(ring, count, &prodHead, &prodNext);
146     if (ret == 0) {
147         return ERR_NOBUFS;
148     }
149 
150     /* write entries in ring */
151     for (i = 0, j = 1; i < (FILLP_UINT)ret; i++, j++) {
152         ring->ringCache[(prodHead + j) % ring->size] = dataTable[i];
153     }
154 
155     sys_arch_compiler_barrier();
156 
157     /*
158      * If there are other enqueues in progress that preceded us,
159      * we need to wait for them to complete
160      */
161     while (unlikely(ring->prod.tail != prodHead)) {
162         FILLP_RTE_PAUSE();
163 
164         /* Set FTDP_RING_PAUSE_REP_COUNT to avoid spin too long waiting
165          * for other thread finish. It gives pre-empted thread a chance
166          * to proceed and finish with ring dequeue operation. */
167 #if LF_RING_PAUSE_REP_COUNT
168         if (++rep == LF_RING_PAUSE_REP_COUNT) {
169             rep = 0;
170             (void)SYS_ARCH_SCHED_YIELD();
171         }
172 #endif
173     }
174 
175     ring->prod.tail = prodNext;
176 
177     FILLP_UNUSED_PARA(rep);
178 
179     return (FillpErrorType)ret;
180 }
181 
FillpLfRingMcDequeueWait(struct FillpLfRing * ring,FILLP_UINT count,FILLP_ULONG * consHead,FILLP_ULONG * consNext)182 static FILLP_ULONG FillpLfRingMcDequeueWait(struct FillpLfRing *ring, FILLP_UINT count, FILLP_ULONG *consHead,
183     FILLP_ULONG *consNext)
184 {
185     FILLP_ULONG prodTail;
186     FILLP_ULONG entries;
187     FILLP_ULONG ret;
188 
189     do {
190         *consHead = ring->cons.head;
191         prodTail = ring->prod.tail;
192         sys_arch_compiler_barrier();
193         /* The subtraction is done between two unsigned 32bits value
194          * (the result is always modulo 32 bits even if we have
195          * cons_head > prod_tail). So 'entries' is always between 0
196          * and size(ring)-1. */
197         entries = (prodTail - *consHead);
198 
199         /* Set the actual entries for dequeue */
200         if (entries == 0) {
201             return 0;
202         }
203         ret = ((entries > count) ? count : entries);
204         *consNext = *consHead + ret;
205 
206         if (!ring->consSafe) {
207             ring->cons.head = *consNext;
208             break;
209         }
210     } while (unlikely(!CAS(&ring->cons.head, *consHead, *consNext)));
211 
212     return ret;
213 }
214 
FillpLfRingMcDequeue(struct FillpLfRing * ring,void ** dataTable,FILLP_UINT count)215 FILLP_INT FillpLfRingMcDequeue(struct FillpLfRing *ring, void **dataTable, FILLP_UINT count)
216 {
217     FILLP_ULONG consHead;
218     FILLP_ULONG consNext;
219     FILLP_ULONG rep = 0;
220     FILLP_ULONG i;
221     FILLP_ULONG j;
222     FILLP_ULONG ret;
223 
224     if ((ring == FILLP_NULL_PTR) || (dataTable == FILLP_NULL_PTR) || (count == 0)) {
225         return ERR_PARAM;
226     }
227     /* move cons.head atomically */
228     ret = FillpLfRingMcDequeueWait(ring, count, &consHead, &consNext);
229     if (ret == 0) {
230         return ERR_NOBUFS;
231     }
232 
233     /* copy in table */
234     for (i = 0, j = 1; i < ret; i++, j++) {
235         dataTable[i] = ring->ringCache[(consHead + j) % ring->size];
236     }
237 
238     sys_arch_compiler_barrier();
239 
240     /*
241      * If there are other dequeues in progress that preceded us,
242      * we need to wait for them to complete
243      */
244     while (unlikely(ring->cons.tail != consHead)) {
245         FILLP_RTE_PAUSE();
246 
247         /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
248          * for other thread finish. It gives pre-empted thread a chance
249          * to proceed and finish with ring dequeue operation. */
250 #if LF_RING_PAUSE_REP_COUNT
251         if (++rep == LF_RING_PAUSE_REP_COUNT) {
252             rep = 0;
253             (void)SYS_ARCH_SCHED_YIELD();
254         }
255 #endif
256     }
257 
258     ring->cons.tail = consNext;
259 
260     FILLP_UNUSED_PARA(rep);
261 
262     return (FILLP_INT)ret;
263 }
264 
FillpRingEmpty(const struct FillpLfRing * r)265 FILLP_INT FillpRingEmpty(const struct FillpLfRing *r)
266 {
267     FILLP_ULONG prodTail = r->prod.tail;
268     FILLP_ULONG consTail = r->cons.tail;
269     return (consTail == prodTail);
270 }
271 
272 
FillpRingFreeEntries(const struct FillpLfRing * r)273 FILLP_INT FillpRingFreeEntries(const struct FillpLfRing *r)
274 {
275     FILLP_ULONG consTail;
276     FILLP_ULONG prodHead;
277     FILLP_ULONG remain;
278     FILLP_INT cnt;
279 
280     if (r == FILLP_NULL_PTR) {
281         FILLP_LOGERR("ring is NULL pointer");
282         return 0;
283     }
284 
285     consTail = r->cons.tail;
286     prodHead = r->prod.head;
287 
288     remain = (r->size + consTail - prodHead);
289     cnt = (int)remain;
290     if (cnt < 0) {
291         FILLP_LOGERR("cnt is %d, real size:%lu", cnt, remain);
292         cnt = 0;
293     }
294 
295     return cnt;
296 }
297 
FillpRingValidOnes(struct FillpLfRing * r)298 FILLP_ULONG FillpRingValidOnes(struct FillpLfRing *r)
299 {
300     FILLP_ULONG prodTail;
301     FILLP_ULONG consHead;
302     FILLP_ULONG ret;
303     if (r == FILLP_NULL_PTR) {
304         return 0;
305     }
306 
307     prodTail = r->prod.tail;
308     consHead = r->cons.head;
309 
310     ret = prodTail - consHead;
311     if (((FILLP_SLONG)ret) < 0) {
312         ret = r->size;
313     }
314     return ret;
315 }
316 
317 #ifdef __cplusplus
318 }
319 #endif
320