1 /*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "log.h"
17 #include "fillp_function.h"
18 #include "lf_ring.h"
19
20 #ifdef __cplusplus
21 extern "C" {
22 #endif
23
24
25 /*
26 * Function : FillpLfRingCalMemSize
27 *
28 * Description : This function will be invoked to calculate the memory size
29 * required for lock-free queue, based on size.
30 *
31 * Input : size - no of items of lock-free queue
32 *
33 * Output : None
34 *
35 * Return : Memory size of lock-free queue
36 */
FillpLfRingCalMemSize(size_t size)37 size_t FillpLfRingCalMemSize(size_t size)
38 {
39 size_t memSize = size * sizeof(void *);
40 if ((memSize == 0) || (memSize / sizeof(void *) != size)) {
41 return 0;
42 }
43
44 memSize = memSize + sizeof(struct FillpLfRing);
45 if (memSize < sizeof(struct FillpLfRing)) {
46 return 0;
47 }
48 return memSize;
49 }
50
51 /*
52 * Function : FillpLfRingInit
53 *
54 * Description : This function will be invoked to init the lock-free queue.
55 *
56 * Input : ring - lock-free queue to be initialized
57 * name - name to be added to the lock-free queue
58 * size - size of lock-free queue
59 *
60 * Output : ring - initialized lock-free queue
61 *
62 * Return : None
63 */
FillpLfRingInit(struct FillpLfRing * ring,char * name,size_t size)64 void FillpLfRingInit(struct FillpLfRing *ring, char *name, size_t size)
65 {
66 FILLP_UNUSED_PARA(name);
67 if (ring == FILLP_NULL_PTR) {
68 return;
69 }
70
71 if (size == 0) {
72 return;
73 }
74
75 ring->size = (FILLP_ULONG)size;
76
77 ring->cons.head = 0;
78 ring->cons.tail = 0;
79
80 ring->prod.head = 0;
81 ring->prod.tail = 0;
82 ring->consSafe = FILLP_TRUE;
83 ring->prodSafe = FILLP_TRUE;
84
85 (void)memset_s(ring->name, sizeof(ring->name), '\0', sizeof(ring->name));
86 return;
87 }
88
FillpLfRingSetProdSafe(struct FillpLfRing * ring,FILLP_BOOL safe)89 void FillpLfRingSetProdSafe(struct FillpLfRing *ring, FILLP_BOOL safe)
90 {
91 ring->prodSafe = safe;
92 }
93
FillpLfRingSetConsSafe(struct FillpLfRing * ring,FILLP_BOOL safe)94 void FillpLfRingSetConsSafe(struct FillpLfRing *ring, FILLP_BOOL safe)
95 {
96 ring->consSafe = safe;
97 }
98
FillpLfRingMpEnqueueWait(struct FillpLfRing * ring,FILLP_UINT count,FILLP_ULONG * prodHead,FILLP_ULONG * prodNext)99 static FILLP_ULONG FillpLfRingMpEnqueueWait(struct FillpLfRing *ring, FILLP_UINT count,
100 FILLP_ULONG *prodHead, FILLP_ULONG *prodNext)
101 {
102 FILLP_ULONG consTail;
103 FILLP_ULONG freeEntries;
104 FILLP_ULONG ret;
105 do {
106 *prodHead = ring->prod.head;
107 consTail = ring->cons.tail;
108
109 sys_arch_compiler_barrier();
110 /* The subtraction is done between two unsigned 32bits value
111 * (the result is always modulo 32 bits even if we have
112 * prod_head > cons_tail). So 'free_entries' is always between 0
113 * and size(ring). */
114 freeEntries = (ring->size + consTail - *prodHead);
115
116 /* check that we have enough room in ring */
117 if (freeEntries == 0) {
118 return 0;
119 }
120 ret = ((freeEntries > count) ? count : freeEntries);
121 *prodNext = *prodHead + ret;
122
123 if (!ring->prodSafe) {
124 ring->prod.head = *prodNext;
125 break;
126 }
127 } while (unlikely(!CAS(&ring->prod.head, *prodHead, *prodNext)));
128
129 return ret;
130 }
131
FillpLfRingMpEnqueue(struct FillpLfRing * ring,void ** dataTable,FILLP_UINT count)132 FillpErrorType FillpLfRingMpEnqueue(struct FillpLfRing *ring, void **dataTable, FILLP_UINT count)
133 {
134 FILLP_ULONG prodHead = 0;
135 FILLP_ULONG prodNext = 0;
136 FILLP_ULONG i;
137 FILLP_ULONG j;
138 FILLP_ULONG rep = 0;
139 FILLP_ULONG ret;
140
141 if ((ring == FILLP_NULL_PTR) || (dataTable == FILLP_NULL_PTR) || (count == 0)) {
142 return ERR_PARAM;
143 }
144 /* move prod.head atomically */
145 ret = FillpLfRingMpEnqueueWait(ring, count, &prodHead, &prodNext);
146 if (ret == 0) {
147 return ERR_NOBUFS;
148 }
149
150 /* write entries in ring */
151 for (i = 0, j = 1; i < (FILLP_UINT)ret; i++, j++) {
152 ring->ringCache[(prodHead + j) % ring->size] = dataTable[i];
153 }
154
155 sys_arch_compiler_barrier();
156
157 /*
158 * If there are other enqueues in progress that preceded us,
159 * we need to wait for them to complete
160 */
161 while (unlikely(ring->prod.tail != prodHead)) {
162 FILLP_RTE_PAUSE();
163
164 /* Set FTDP_RING_PAUSE_REP_COUNT to avoid spin too long waiting
165 * for other thread finish. It gives pre-empted thread a chance
166 * to proceed and finish with ring dequeue operation. */
167 #if LF_RING_PAUSE_REP_COUNT
168 if (++rep == LF_RING_PAUSE_REP_COUNT) {
169 rep = 0;
170 (void)SYS_ARCH_SCHED_YIELD();
171 }
172 #endif
173 }
174
175 ring->prod.tail = prodNext;
176
177 FILLP_UNUSED_PARA(rep);
178
179 return (FillpErrorType)ret;
180 }
181
FillpLfRingMcDequeueWait(struct FillpLfRing * ring,FILLP_UINT count,FILLP_ULONG * consHead,FILLP_ULONG * consNext)182 static FILLP_ULONG FillpLfRingMcDequeueWait(struct FillpLfRing *ring, FILLP_UINT count, FILLP_ULONG *consHead,
183 FILLP_ULONG *consNext)
184 {
185 FILLP_ULONG prodTail;
186 FILLP_ULONG entries;
187 FILLP_ULONG ret;
188
189 do {
190 *consHead = ring->cons.head;
191 prodTail = ring->prod.tail;
192 sys_arch_compiler_barrier();
193 /* The subtraction is done between two unsigned 32bits value
194 * (the result is always modulo 32 bits even if we have
195 * cons_head > prod_tail). So 'entries' is always between 0
196 * and size(ring)-1. */
197 entries = (prodTail - *consHead);
198
199 /* Set the actual entries for dequeue */
200 if (entries == 0) {
201 return 0;
202 }
203 ret = ((entries > count) ? count : entries);
204 *consNext = *consHead + ret;
205
206 if (!ring->consSafe) {
207 ring->cons.head = *consNext;
208 break;
209 }
210 } while (unlikely(!CAS(&ring->cons.head, *consHead, *consNext)));
211
212 return ret;
213 }
214
FillpLfRingMcDequeue(struct FillpLfRing * ring,void ** dataTable,FILLP_UINT count)215 FILLP_INT FillpLfRingMcDequeue(struct FillpLfRing *ring, void **dataTable, FILLP_UINT count)
216 {
217 FILLP_ULONG consHead;
218 FILLP_ULONG consNext;
219 FILLP_ULONG rep = 0;
220 FILLP_ULONG i;
221 FILLP_ULONG j;
222 FILLP_ULONG ret;
223
224 if ((ring == FILLP_NULL_PTR) || (dataTable == FILLP_NULL_PTR) || (count == 0)) {
225 return ERR_PARAM;
226 }
227 /* move cons.head atomically */
228 ret = FillpLfRingMcDequeueWait(ring, count, &consHead, &consNext);
229 if (ret == 0) {
230 return ERR_NOBUFS;
231 }
232
233 /* copy in table */
234 for (i = 0, j = 1; i < ret; i++, j++) {
235 dataTable[i] = ring->ringCache[(consHead + j) % ring->size];
236 }
237
238 sys_arch_compiler_barrier();
239
240 /*
241 * If there are other dequeues in progress that preceded us,
242 * we need to wait for them to complete
243 */
244 while (unlikely(ring->cons.tail != consHead)) {
245 FILLP_RTE_PAUSE();
246
247 /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
248 * for other thread finish. It gives pre-empted thread a chance
249 * to proceed and finish with ring dequeue operation. */
250 #if LF_RING_PAUSE_REP_COUNT
251 if (++rep == LF_RING_PAUSE_REP_COUNT) {
252 rep = 0;
253 (void)SYS_ARCH_SCHED_YIELD();
254 }
255 #endif
256 }
257
258 ring->cons.tail = consNext;
259
260 FILLP_UNUSED_PARA(rep);
261
262 return (FILLP_INT)ret;
263 }
264
FillpRingEmpty(const struct FillpLfRing * r)265 FILLP_INT FillpRingEmpty(const struct FillpLfRing *r)
266 {
267 FILLP_ULONG prodTail = r->prod.tail;
268 FILLP_ULONG consTail = r->cons.tail;
269 return (consTail == prodTail);
270 }
271
272
FillpRingFreeEntries(const struct FillpLfRing * r)273 FILLP_INT FillpRingFreeEntries(const struct FillpLfRing *r)
274 {
275 FILLP_ULONG consTail;
276 FILLP_ULONG prodHead;
277 FILLP_ULONG remain;
278 FILLP_INT cnt;
279
280 if (r == FILLP_NULL_PTR) {
281 FILLP_LOGERR("ring is NULL pointer");
282 return 0;
283 }
284
285 consTail = r->cons.tail;
286 prodHead = r->prod.head;
287
288 remain = (r->size + consTail - prodHead);
289 cnt = (int)remain;
290 if (cnt < 0) {
291 FILLP_LOGERR("cnt is %d, real size:%lu", cnt, remain);
292 cnt = 0;
293 }
294
295 return cnt;
296 }
297
FillpRingValidOnes(struct FillpLfRing * r)298 FILLP_ULONG FillpRingValidOnes(struct FillpLfRing *r)
299 {
300 FILLP_ULONG prodTail;
301 FILLP_ULONG consHead;
302 FILLP_ULONG ret;
303 if (r == FILLP_NULL_PTR) {
304 return 0;
305 }
306
307 prodTail = r->prod.tail;
308 consHead = r->cons.head;
309
310 ret = prodTail - consHead;
311 if (((FILLP_SLONG)ret) < 0) {
312 ret = r->size;
313 }
314 return ret;
315 }
316
317 #ifdef __cplusplus
318 }
319 #endif
320