• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "softbus_conn_fair_priority_queue.h"
17 
18 #include "common_list.h"
19 #include "softbus_adapter_mem.h"
20 #include "softbus_adapter_thread.h"
21 #include "softbus_queue.h"
22 
23 #include "conn_log.h"
24 
25 #define FACTOR_S_MS_US 1000
26 
ConnQueueItemConstruct(struct ConnQueueItem * item,int32_t id,ConnPriority priority)27 void ConnQueueItemConstruct(struct ConnQueueItem *item, int32_t id, ConnPriority priority)
28 {
29     CONN_CHECK_AND_RETURN_LOGE(item != NULL, CONN_COMMON, "item is null");
30     item->id = id;
31     item->priority = priority;
32 }
33 
ConnQueueItemDestruct(struct ConnQueueItem * item)34 void ConnQueueItemDestruct(struct ConnQueueItem *item)
35 {
36     CONN_CHECK_AND_RETURN_LOGE(item != NULL, CONN_COMMON, "item is null");
37     (void)item;
38 }
39 
40 struct PriorityQueue {
41     ListNode node;
42 
43     int32_t id;
44     LockFreeQueue *queue[CONN_PRIORITY_BUTT];
45     uint32_t size;
46 };
47 
CreatePriorityQueue(int32_t id,uint32_t size)48 static struct PriorityQueue *CreatePriorityQueue(int32_t id, uint32_t size)
49 {
50     struct PriorityQueue *pq = SoftBusCalloc(sizeof(struct PriorityQueue));
51     CONN_CHECK_AND_RETURN_RET_LOGE(pq != NULL, NULL, CONN_COMMON, "malloc fail");
52 
53     ListInit(&pq->node);
54     pq->id = id;
55     pq->size = size;
56 
57     return pq;
58 }
59 
DestroyPriorityQueue(struct PriorityQueue * queue)60 static void DestroyPriorityQueue(struct PriorityQueue *queue)
61 {
62     CONN_CHECK_AND_RETURN_LOGE(queue != NULL, CONN_COMMON, "queue is null");
63     for (ConnPriority i = 0; i < CONN_PRIORITY_BUTT; i++) {
64         LockFreeQueue *lfq = queue->queue[i];
65         SoftBusFree(lfq);
66         queue->queue[i] = NULL;
67     }
68     SoftBusFree(queue);
69 }
70 
GetQueue(struct PriorityQueue * queue,ConnPriority priority,bool createIfEmpty)71 static LockFreeQueue *GetQueue(struct PriorityQueue *queue, ConnPriority priority, bool createIfEmpty)
72 {
73     CONN_CHECK_AND_RETURN_RET_LOGE(
74         priority < CONN_PRIORITY_BUTT, NULL, CONN_COMMON, "priority out of bound: priority=%{public}d", priority);
75 
76     LockFreeQueue *lfq = queue->queue[priority];
77     if (!createIfEmpty || lfq != NULL) {
78         return lfq;
79     }
80     lfq = CreateQueue(queue->size);
81     CONN_CHECK_AND_RETURN_RET_LOGE(lfq != NULL, NULL, CONN_COMMON, "create queue fail");
82     queue->queue[priority] = lfq;
83     return lfq;
84 }
85 
Enqueue(struct PriorityQueue * queue,struct ConnQueueItem * item)86 static int32_t Enqueue(struct PriorityQueue *queue, struct ConnQueueItem *item)
87 {
88     ConnPriority priority = item->priority;
89     LockFreeQueue *lfq = GetQueue(queue, priority, true);
90     CONN_CHECK_AND_RETURN_RET_LOGE(lfq != NULL, SOFTBUS_MALLOC_ERR, CONN_COMMON,
91         "get queue fail, id=%{public}d, priority=%{public}d", item->id, priority);
92     int32_t ret = QueueMultiProducerEnqueue(lfq, item);
93     return ret;
94 }
95 
Dequeue(struct PriorityQueue * queue,ConnPriority least,struct ConnQueueItem ** out)96 static int32_t Dequeue(struct PriorityQueue *queue, ConnPriority least, struct ConnQueueItem **out)
97 {
98     for (ConnPriority priority = 0; priority <= least; priority++) {
99         LockFreeQueue *lfq = GetQueue(queue, (ConnPriority)priority, false);
100         if (lfq == NULL) {
101             continue;
102         }
103         int32_t ret = QueueSingleConsumerDequeue(lfq, (void **)out);
104         if (ret == QUEUE_EMPTY) {
105             continue;
106         }
107         return ret;
108     }
109     return QUEUE_EMPTY;
110 }
111 
112 struct ConnFairPriorityQueue {
113     uint32_t size;
114 
115     SoftBusMutex lock;
116     SoftBusCond enqueueCondition;
117     SoftBusCond dequeueCondition;
118 
119     ListNode queues;
120     struct PriorityQueue *innerQueue;
121 };
122 
ConnCreateQueue(uint32_t size)123 ConnFairPriorityQueue *ConnCreateQueue(uint32_t size)
124 {
125     ConnFairPriorityQueue *queue = SoftBusCalloc(sizeof(ConnFairPriorityQueue));
126     CONN_CHECK_AND_RETURN_RET_LOGE(queue != NULL, NULL, CONN_COMMON, "malloc fail");
127 
128     queue->size = size;
129     SoftBusMutexAttr attr = {
130         .type = SOFTBUS_MUTEX_RECURSIVE,
131     };
132     int32_t ret = SoftBusMutexInit(&queue->lock, &attr);
133     if (ret != SOFTBUS_OK) {
134         CONN_LOGE(CONN_COMMON, "init lock fail: error=%{public}d", ret);
135         goto CLEANUP;
136     }
137     ret = SoftBusCondInit(&queue->dequeueCondition);
138     if (ret != SOFTBUS_OK) {
139         CONN_LOGE(CONN_COMMON, "init dequeue condition fail: error=%{public}d", ret);
140         goto CLEANUP;
141     }
142     ret = SoftBusCondInit(&queue->enqueueCondition);
143     if (ret != SOFTBUS_OK) {
144         CONN_LOGE(CONN_COMMON, "init dequeue condition fail: error=%{public}d", ret);
145         goto CLEANUP;
146     }
147     ListInit(&queue->queues);
148     queue->innerQueue = CreatePriorityQueue(0, queue->size);
149     if (queue->innerQueue == NULL) {
150         CONN_LOGE(CONN_COMMON, "create inner priority queue fail");
151         goto CLEANUP;
152     }
153     return queue;
154 CLEANUP:
155     SoftBusMutexDestroy(&queue->lock);
156     SoftBusCondDestroy(&queue->dequeueCondition);
157     SoftBusCondDestroy(&queue->enqueueCondition);
158     DestroyPriorityQueue(queue->innerQueue);
159     SoftBusFree(queue);
160     return NULL;
161 }
162 
ConnDestroyQueue(ConnFairPriorityQueue * queue)163 void ConnDestroyQueue(ConnFairPriorityQueue *queue)
164 {
165     CONN_CHECK_AND_RETURN_LOGE(queue != NULL, CONN_COMMON, "queue is null");
166 
167     SoftBusMutexDestroy(&queue->lock);
168     SoftBusCondDestroy(&queue->enqueueCondition);
169     SoftBusCondDestroy(&queue->dequeueCondition);
170 
171     struct PriorityQueue *it = NULL;
172     struct PriorityQueue *next = NULL;
173     LIST_FOR_EACH_ENTRY_SAFE(it, next, &queue->queues, struct PriorityQueue, node) {
174         ListDelete(&it->node);
175         DestroyPriorityQueue(it);
176     }
177     DestroyPriorityQueue(queue->innerQueue);
178     SoftBusFree(queue);
179 }
180 
GetOrCreatePriorityQueue(ConnFairPriorityQueue * queue,int32_t id)181 static struct PriorityQueue *GetOrCreatePriorityQueue(ConnFairPriorityQueue *queue, int32_t id)
182 {
183     if (id == 0) {
184         return queue->innerQueue;
185     }
186     struct PriorityQueue *it = NULL;
187     LIST_FOR_EACH_ENTRY(it, &queue->queues, struct PriorityQueue, node) {
188         if (it->id == id) {
189             return it;
190         }
191     }
192 
193     struct PriorityQueue *pq = CreatePriorityQueue(id, queue->size);
194     CONN_CHECK_AND_RETURN_RET_LOGE(pq != NULL, NULL, CONN_COMMON, "create priority queue fail");
195     ListTailInsert(&queue->queues, &pq->node);
196     return pq;
197 }
198 
WaitCondition(SoftBusCond * condition,SoftBusMutex * mutex,int32_t timeoutMs)199 static int32_t WaitCondition(SoftBusCond *condition, SoftBusMutex *mutex, int32_t timeoutMs)
200 {
201     if (timeoutMs < 0) {
202         return SoftBusCondWait(condition, mutex, NULL);
203     }
204     SoftBusSysTime now = { 0 };
205     int32_t ret = SoftBusGetTime(&now);
206     CONN_CHECK_AND_RETURN_RET_LOGE(ret == SOFTBUS_OK, ret, CONN_COMMON, "get time fail: error=%{public}d", ret);
207 
208     int64_t us = timeoutMs * FACTOR_S_MS_US + now.usec;
209     now.sec += us / (FACTOR_S_MS_US * FACTOR_S_MS_US);
210     now.usec = us % (FACTOR_S_MS_US * FACTOR_S_MS_US);
211     return SoftBusCondWait(condition, mutex, &now);
212 }
213 
ConnEnqueue(ConnFairPriorityQueue * queue,struct ConnQueueItem * item,int32_t timeoutMs)214 int32_t ConnEnqueue(ConnFairPriorityQueue *queue, struct ConnQueueItem *item, int32_t timeoutMs)
215 {
216     CONN_CHECK_AND_RETURN_RET_LOGW(queue != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "queue is null");
217     CONN_CHECK_AND_RETURN_RET_LOGW(item != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "item is null");
218 
219     int32_t code = SoftBusMutexLock(&queue->lock);
220     CONN_CHECK_AND_RETURN_RET_LOGW(code == SOFTBUS_OK, code, CONN_COMMON, "lock queue fail: error=%{public}d", code);
221     bool afterWait = false;
222     do {
223         struct PriorityQueue *pq = GetOrCreatePriorityQueue(queue, item->id);
224         if (pq == NULL) {
225             code = SOFTBUS_MALLOC_ERR;
226             CONN_LOGE(CONN_COMMON,
227                 "enqueue fail: get queue fail, id=%{public}d, priority=%{public}d, error=%{public}d", item->id,
228                 item->priority, code);
229             break;
230         }
231         code = Enqueue(pq, item);
232         if (code == SOFTBUS_OK) {
233             break;
234         }
235         if (code != QUEUE_FULL) {
236             CONN_LOGE(CONN_COMMON, "enqueue fail: id=%{public}d, priority=%{public}d, error=%{public}d", item->id,
237                 item->priority, code);
238             break;
239         }
240         if (afterWait) {
241             CONN_LOGE(CONN_COMMON, "can not enqueue item after being awake up");
242             code = SOFTBUS_CONN_INTERNAL_ERR;
243             break;
244         }
245         CONN_LOGE(CONN_COMMON, "queue is full, id=%{public}d, priority=%{public}d", item->id, item->priority);
246         code = WaitCondition(&queue->enqueueCondition, &queue->lock, timeoutMs);
247         if (code != SOFTBUS_OK) {
248             CONN_LOGE(CONN_COMMON,
249                 "wait enqueue condition fail: id=%{public}d, priority=%{public}d, error=%{public}d", item->id,
250                 item->priority, code);
251             break;
252         }
253         afterWait = true;
254     } while (true);
255     if (code == SOFTBUS_OK) {
256         SoftBusCondBroadcast(&queue->dequeueCondition);
257     }
258     (void)SoftBusMutexUnlock(&queue->lock);
259     return code;
260 }
261 
DequeueInner(ConnFairPriorityQueue * queue,ConnPriority least,struct ConnQueueItem ** outMsg)262 static int32_t DequeueInner(ConnFairPriorityQueue *queue, ConnPriority least, struct ConnQueueItem **outMsg)
263 {
264     int32_t ret = Dequeue(queue->innerQueue, least, outMsg);
265     if (ret != SOFTBUS_OK && ret != QUEUE_EMPTY) {
266         CONN_LOGE(CONN_COMMON, "get item from inner queue fail: error=%{public}d", ret);
267     }
268     return ret;
269 }
270 
DequeueFairly(ConnFairPriorityQueue * queue,struct ConnQueueItem ** outMsg)271 static int32_t DequeueFairly(ConnFairPriorityQueue *queue, struct ConnQueueItem **outMsg)
272 {
273     struct PriorityQueue *it = NULL;
274     struct PriorityQueue *next = NULL;
275     LIST_FOR_EACH_ENTRY_SAFE(it, next, &queue->queues, struct PriorityQueue, node) {
276         ListDelete(&it->node);
277         int32_t ret = Dequeue(it, CONN_PRIORITY_LOW, outMsg);
278         if (ret == QUEUE_EMPTY) {
279             DestroyPriorityQueue(it);
280             continue;
281         }
282         ListTailInsert(&queue->queues, &it->node);
283         if (ret != SOFTBUS_OK) {
284             CONN_LOGE(CONN_COMMON, "get item from queue fail: pid=%{public}d, error=%{public}d", it->id, ret);
285         }
286         return ret;
287     }
288     return QUEUE_EMPTY;
289 }
290 
ConnDequeue(ConnFairPriorityQueue * queue,struct ConnQueueItem ** outMsg,int32_t timeoutMs)291 int32_t ConnDequeue(ConnFairPriorityQueue *queue, struct ConnQueueItem **outMsg, int32_t timeoutMs)
292 {
293     CONN_CHECK_AND_RETURN_RET_LOGW(queue != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "queue is null");
294     CONN_CHECK_AND_RETURN_RET_LOGW(outMsg != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "out item is null");
295 
296     int32_t code = SoftBusMutexLock(&queue->lock);
297     CONN_CHECK_AND_RETURN_RET_LOGW(code == SOFTBUS_OK, code, CONN_COMMON, "lock queue fail: error=%{public}d", code);
298     bool afterWait = false;
299     do {
300         code = DequeueInner(queue, CONN_PRIORITY_MIDDLE, outMsg);
301         if (code != QUEUE_EMPTY) {
302             break;
303         }
304         code = DequeueFairly(queue, outMsg);
305         if (code != QUEUE_EMPTY) {
306             break;
307         }
308         code = DequeueInner(queue, CONN_PRIORITY_LOW, outMsg);
309         if (code != QUEUE_EMPTY) {
310             break;
311         }
312         if (afterWait) {
313             CONN_LOGE(CONN_COMMON, "can not dequeue item after being awake up");
314             code = SOFTBUS_CONN_INTERNAL_ERR;
315             break;
316         }
317 
318         code = WaitCondition(&queue->dequeueCondition, &queue->lock, timeoutMs);
319         if (code == SOFTBUS_TIMOUT) {
320             CONN_LOGE(CONN_COMMON, "wait dequeue condition timeout");
321             break;
322         }
323         if (code != SOFTBUS_OK) {
324             CONN_LOGE(CONN_COMMON, "wait dequeue condition fail: error=%{public}d", code);
325             break;
326         }
327         afterWait = true;
328     } while (true);
329     if (code == SOFTBUS_OK) {
330         SoftBusCondBroadcast(&queue->enqueueCondition);
331     }
332     (void)SoftBusMutexUnlock(&queue->lock);
333     return code;
334 }
335