1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "softbus_conn_fair_priority_queue.h"
17
18 #include "common_list.h"
19 #include "softbus_adapter_mem.h"
20 #include "softbus_adapter_thread.h"
21 #include "softbus_queue.h"
22
23 #include "conn_log.h"
24
25 #define FACTOR_S_MS_US 1000
26
ConnQueueItemConstruct(struct ConnQueueItem * item,int32_t id,ConnPriority priority)27 void ConnQueueItemConstruct(struct ConnQueueItem *item, int32_t id, ConnPriority priority)
28 {
29 CONN_CHECK_AND_RETURN_LOGE(item != NULL, CONN_COMMON, "item is null");
30 item->id = id;
31 item->priority = priority;
32 }
33
ConnQueueItemDestruct(struct ConnQueueItem * item)34 void ConnQueueItemDestruct(struct ConnQueueItem *item)
35 {
36 CONN_CHECK_AND_RETURN_LOGE(item != NULL, CONN_COMMON, "item is null");
37 (void)item;
38 }
39
40 struct PriorityQueue {
41 ListNode node;
42
43 int32_t id;
44 LockFreeQueue *queue[CONN_PRIORITY_BUTT];
45 uint32_t size;
46 };
47
CreatePriorityQueue(int32_t id,uint32_t size)48 static struct PriorityQueue *CreatePriorityQueue(int32_t id, uint32_t size)
49 {
50 struct PriorityQueue *pq = SoftBusCalloc(sizeof(struct PriorityQueue));
51 CONN_CHECK_AND_RETURN_RET_LOGE(pq != NULL, NULL, CONN_COMMON, "malloc fail");
52
53 ListInit(&pq->node);
54 pq->id = id;
55 pq->size = size;
56
57 return pq;
58 }
59
DestroyPriorityQueue(struct PriorityQueue * queue)60 static void DestroyPriorityQueue(struct PriorityQueue *queue)
61 {
62 CONN_CHECK_AND_RETURN_LOGE(queue != NULL, CONN_COMMON, "queue is null");
63 for (ConnPriority i = 0; i < CONN_PRIORITY_BUTT; i++) {
64 LockFreeQueue *lfq = queue->queue[i];
65 SoftBusFree(lfq);
66 queue->queue[i] = NULL;
67 }
68 SoftBusFree(queue);
69 }
70
GetQueue(struct PriorityQueue * queue,ConnPriority priority,bool createIfEmpty)71 static LockFreeQueue *GetQueue(struct PriorityQueue *queue, ConnPriority priority, bool createIfEmpty)
72 {
73 CONN_CHECK_AND_RETURN_RET_LOGE(
74 priority < CONN_PRIORITY_BUTT, NULL, CONN_COMMON, "priority out of bound: priority=%{public}d", priority);
75
76 LockFreeQueue *lfq = queue->queue[priority];
77 if (!createIfEmpty || lfq != NULL) {
78 return lfq;
79 }
80 lfq = CreateQueue(queue->size);
81 CONN_CHECK_AND_RETURN_RET_LOGE(lfq != NULL, NULL, CONN_COMMON, "create queue fail");
82 queue->queue[priority] = lfq;
83 return lfq;
84 }
85
Enqueue(struct PriorityQueue * queue,struct ConnQueueItem * item)86 static int32_t Enqueue(struct PriorityQueue *queue, struct ConnQueueItem *item)
87 {
88 ConnPriority priority = item->priority;
89 LockFreeQueue *lfq = GetQueue(queue, priority, true);
90 CONN_CHECK_AND_RETURN_RET_LOGE(lfq != NULL, SOFTBUS_MALLOC_ERR, CONN_COMMON,
91 "get queue fail, id=%{public}d, priority=%{public}d", item->id, priority);
92 int32_t ret = QueueMultiProducerEnqueue(lfq, item);
93 return ret;
94 }
95
Dequeue(struct PriorityQueue * queue,ConnPriority least,struct ConnQueueItem ** out)96 static int32_t Dequeue(struct PriorityQueue *queue, ConnPriority least, struct ConnQueueItem **out)
97 {
98 for (ConnPriority priority = 0; priority <= least; priority++) {
99 LockFreeQueue *lfq = GetQueue(queue, (ConnPriority)priority, false);
100 if (lfq == NULL) {
101 continue;
102 }
103 int32_t ret = QueueSingleConsumerDequeue(lfq, (void **)out);
104 if (ret == QUEUE_EMPTY) {
105 continue;
106 }
107 return ret;
108 }
109 return QUEUE_EMPTY;
110 }
111
112 struct ConnFairPriorityQueue {
113 uint32_t size;
114
115 SoftBusMutex lock;
116 SoftBusCond enqueueCondition;
117 SoftBusCond dequeueCondition;
118
119 ListNode queues;
120 struct PriorityQueue *innerQueue;
121 };
122
ConnCreateQueue(uint32_t size)123 ConnFairPriorityQueue *ConnCreateQueue(uint32_t size)
124 {
125 ConnFairPriorityQueue *queue = SoftBusCalloc(sizeof(ConnFairPriorityQueue));
126 CONN_CHECK_AND_RETURN_RET_LOGE(queue != NULL, NULL, CONN_COMMON, "malloc fail");
127
128 queue->size = size;
129 SoftBusMutexAttr attr = {
130 .type = SOFTBUS_MUTEX_RECURSIVE,
131 };
132 int32_t ret = SoftBusMutexInit(&queue->lock, &attr);
133 if (ret != SOFTBUS_OK) {
134 CONN_LOGE(CONN_COMMON, "init lock fail: error=%{public}d", ret);
135 goto CLEANUP;
136 }
137 ret = SoftBusCondInit(&queue->dequeueCondition);
138 if (ret != SOFTBUS_OK) {
139 CONN_LOGE(CONN_COMMON, "init dequeue condition fail: error=%{public}d", ret);
140 goto CLEANUP;
141 }
142 ret = SoftBusCondInit(&queue->enqueueCondition);
143 if (ret != SOFTBUS_OK) {
144 CONN_LOGE(CONN_COMMON, "init dequeue condition fail: error=%{public}d", ret);
145 goto CLEANUP;
146 }
147 ListInit(&queue->queues);
148 queue->innerQueue = CreatePriorityQueue(0, queue->size);
149 if (queue->innerQueue == NULL) {
150 CONN_LOGE(CONN_COMMON, "create inner priority queue fail");
151 goto CLEANUP;
152 }
153 return queue;
154 CLEANUP:
155 SoftBusMutexDestroy(&queue->lock);
156 SoftBusCondDestroy(&queue->dequeueCondition);
157 SoftBusCondDestroy(&queue->enqueueCondition);
158 DestroyPriorityQueue(queue->innerQueue);
159 SoftBusFree(queue);
160 return NULL;
161 }
162
ConnDestroyQueue(ConnFairPriorityQueue * queue)163 void ConnDestroyQueue(ConnFairPriorityQueue *queue)
164 {
165 CONN_CHECK_AND_RETURN_LOGE(queue != NULL, CONN_COMMON, "queue is null");
166
167 SoftBusMutexDestroy(&queue->lock);
168 SoftBusCondDestroy(&queue->enqueueCondition);
169 SoftBusCondDestroy(&queue->dequeueCondition);
170
171 struct PriorityQueue *it = NULL;
172 struct PriorityQueue *next = NULL;
173 LIST_FOR_EACH_ENTRY_SAFE(it, next, &queue->queues, struct PriorityQueue, node) {
174 ListDelete(&it->node);
175 DestroyPriorityQueue(it);
176 }
177 DestroyPriorityQueue(queue->innerQueue);
178 SoftBusFree(queue);
179 }
180
GetOrCreatePriorityQueue(ConnFairPriorityQueue * queue,int32_t id)181 static struct PriorityQueue *GetOrCreatePriorityQueue(ConnFairPriorityQueue *queue, int32_t id)
182 {
183 if (id == 0) {
184 return queue->innerQueue;
185 }
186 struct PriorityQueue *it = NULL;
187 LIST_FOR_EACH_ENTRY(it, &queue->queues, struct PriorityQueue, node) {
188 if (it->id == id) {
189 return it;
190 }
191 }
192
193 struct PriorityQueue *pq = CreatePriorityQueue(id, queue->size);
194 CONN_CHECK_AND_RETURN_RET_LOGE(pq != NULL, NULL, CONN_COMMON, "create priority queue fail");
195 ListTailInsert(&queue->queues, &pq->node);
196 return pq;
197 }
198
WaitCondition(SoftBusCond * condition,SoftBusMutex * mutex,int32_t timeoutMs)199 static int32_t WaitCondition(SoftBusCond *condition, SoftBusMutex *mutex, int32_t timeoutMs)
200 {
201 if (timeoutMs < 0) {
202 return SoftBusCondWait(condition, mutex, NULL);
203 }
204 SoftBusSysTime now = { 0 };
205 int32_t ret = SoftBusGetTime(&now);
206 CONN_CHECK_AND_RETURN_RET_LOGE(ret == SOFTBUS_OK, ret, CONN_COMMON, "get time fail: error=%{public}d", ret);
207
208 int64_t us = timeoutMs * FACTOR_S_MS_US + now.usec;
209 now.sec += us / (FACTOR_S_MS_US * FACTOR_S_MS_US);
210 now.usec = us % (FACTOR_S_MS_US * FACTOR_S_MS_US);
211 return SoftBusCondWait(condition, mutex, &now);
212 }
213
ConnEnqueue(ConnFairPriorityQueue * queue,struct ConnQueueItem * item,int32_t timeoutMs)214 int32_t ConnEnqueue(ConnFairPriorityQueue *queue, struct ConnQueueItem *item, int32_t timeoutMs)
215 {
216 CONN_CHECK_AND_RETURN_RET_LOGW(queue != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "queue is null");
217 CONN_CHECK_AND_RETURN_RET_LOGW(item != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "item is null");
218
219 int32_t code = SoftBusMutexLock(&queue->lock);
220 CONN_CHECK_AND_RETURN_RET_LOGW(code == SOFTBUS_OK, code, CONN_COMMON, "lock queue fail: error=%{public}d", code);
221 bool afterWait = false;
222 do {
223 struct PriorityQueue *pq = GetOrCreatePriorityQueue(queue, item->id);
224 if (pq == NULL) {
225 code = SOFTBUS_MALLOC_ERR;
226 CONN_LOGE(CONN_COMMON,
227 "enqueue fail: get queue fail, id=%{public}d, priority=%{public}d, error=%{public}d", item->id,
228 item->priority, code);
229 break;
230 }
231 code = Enqueue(pq, item);
232 if (code == SOFTBUS_OK) {
233 break;
234 }
235 if (code != QUEUE_FULL) {
236 CONN_LOGE(CONN_COMMON, "enqueue fail: id=%{public}d, priority=%{public}d, error=%{public}d", item->id,
237 item->priority, code);
238 break;
239 }
240 if (afterWait) {
241 CONN_LOGE(CONN_COMMON, "can not enqueue item after being awake up");
242 code = SOFTBUS_CONN_INTERNAL_ERR;
243 break;
244 }
245 CONN_LOGE(CONN_COMMON, "queue is full, id=%{public}d, priority=%{public}d", item->id, item->priority);
246 code = WaitCondition(&queue->enqueueCondition, &queue->lock, timeoutMs);
247 if (code != SOFTBUS_OK) {
248 CONN_LOGE(CONN_COMMON,
249 "wait enqueue condition fail: id=%{public}d, priority=%{public}d, error=%{public}d", item->id,
250 item->priority, code);
251 break;
252 }
253 afterWait = true;
254 } while (true);
255 if (code == SOFTBUS_OK) {
256 SoftBusCondBroadcast(&queue->dequeueCondition);
257 }
258 (void)SoftBusMutexUnlock(&queue->lock);
259 return code;
260 }
261
DequeueInner(ConnFairPriorityQueue * queue,ConnPriority least,struct ConnQueueItem ** outMsg)262 static int32_t DequeueInner(ConnFairPriorityQueue *queue, ConnPriority least, struct ConnQueueItem **outMsg)
263 {
264 int32_t ret = Dequeue(queue->innerQueue, least, outMsg);
265 if (ret != SOFTBUS_OK && ret != QUEUE_EMPTY) {
266 CONN_LOGE(CONN_COMMON, "get item from inner queue fail: error=%{public}d", ret);
267 }
268 return ret;
269 }
270
DequeueFairly(ConnFairPriorityQueue * queue,struct ConnQueueItem ** outMsg)271 static int32_t DequeueFairly(ConnFairPriorityQueue *queue, struct ConnQueueItem **outMsg)
272 {
273 struct PriorityQueue *it = NULL;
274 struct PriorityQueue *next = NULL;
275 LIST_FOR_EACH_ENTRY_SAFE(it, next, &queue->queues, struct PriorityQueue, node) {
276 ListDelete(&it->node);
277 int32_t ret = Dequeue(it, CONN_PRIORITY_LOW, outMsg);
278 if (ret == QUEUE_EMPTY) {
279 DestroyPriorityQueue(it);
280 continue;
281 }
282 ListTailInsert(&queue->queues, &it->node);
283 if (ret != SOFTBUS_OK) {
284 CONN_LOGE(CONN_COMMON, "get item from queue fail: pid=%{public}d, error=%{public}d", it->id, ret);
285 }
286 return ret;
287 }
288 return QUEUE_EMPTY;
289 }
290
ConnDequeue(ConnFairPriorityQueue * queue,struct ConnQueueItem ** outMsg,int32_t timeoutMs)291 int32_t ConnDequeue(ConnFairPriorityQueue *queue, struct ConnQueueItem **outMsg, int32_t timeoutMs)
292 {
293 CONN_CHECK_AND_RETURN_RET_LOGW(queue != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "queue is null");
294 CONN_CHECK_AND_RETURN_RET_LOGW(outMsg != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "out item is null");
295
296 int32_t code = SoftBusMutexLock(&queue->lock);
297 CONN_CHECK_AND_RETURN_RET_LOGW(code == SOFTBUS_OK, code, CONN_COMMON, "lock queue fail: error=%{public}d", code);
298 bool afterWait = false;
299 do {
300 code = DequeueInner(queue, CONN_PRIORITY_MIDDLE, outMsg);
301 if (code != QUEUE_EMPTY) {
302 break;
303 }
304 code = DequeueFairly(queue, outMsg);
305 if (code != QUEUE_EMPTY) {
306 break;
307 }
308 code = DequeueInner(queue, CONN_PRIORITY_LOW, outMsg);
309 if (code != QUEUE_EMPTY) {
310 break;
311 }
312 if (afterWait) {
313 CONN_LOGE(CONN_COMMON, "can not dequeue item after being awake up");
314 code = SOFTBUS_CONN_INTERNAL_ERR;
315 break;
316 }
317
318 code = WaitCondition(&queue->dequeueCondition, &queue->lock, timeoutMs);
319 if (code == SOFTBUS_TIMOUT) {
320 CONN_LOGE(CONN_COMMON, "wait dequeue condition timeout");
321 break;
322 }
323 if (code != SOFTBUS_OK) {
324 CONN_LOGE(CONN_COMMON, "wait dequeue condition fail: error=%{public}d", code);
325 break;
326 }
327 afterWait = true;
328 } while (true);
329 if (code == SOFTBUS_OK) {
330 SoftBusCondBroadcast(&queue->enqueueCondition);
331 }
332 (void)SoftBusMutexUnlock(&queue->lock);
333 return code;
334 }
335