1 /*
2 * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
3 *
4 * HDF is dual licensed: you can use it either under the terms of
5 * the GPL, or the BSD license, at your option.
6 * See the LICENSE file in the root of this repository for complete details.
7 */
8
9 #include "platform_queue.h"
10 #include "hdf_log.h"
11 #include "osal_mem.h"
12 #include "osal_mutex.h"
13 #include "osal_thread.h"
14 #include "osal_time.h"
15 #include "platform_errno.h"
16 #include "platform_log.h"
17
18 #define PLAT_QUEUE_THREAD_STAK 20000
19 #define PLAT_QUEUE_DEPTH_MAX 32
20
PlatformQueueDoDestroy(struct PlatformQueue * queue)21 static void PlatformQueueDoDestroy(struct PlatformQueue *queue)
22 {
23 (void)OsalThreadDestroy(&queue->thread);
24 (void)OsalSemDestroy(&queue->sem);
25 (void)OsalSpinDestroy(&queue->spin);
26 OsalMemFree(queue);
27 }
28
PlatformQueueNextMsg(struct PlatformQueue * queue,struct PlatformMsg ** msg)29 static int32_t PlatformQueueNextMsg(struct PlatformQueue *queue, struct PlatformMsg **msg)
30 {
31 int32_t ret;
32
33 (void)OsalSpinLock(&queue->spin);
34 if (DListIsEmpty(&queue->msgs)) {
35 ret = HDF_PLT_ERR_NO_DATA;
36 } else {
37 *msg = DLIST_FIRST_ENTRY(&queue->msgs, struct PlatformMsg, node);
38 DListRemove(&((*msg)->node));
39 queue->depth--;
40 ret = HDF_SUCCESS;
41 }
42 (void)OsalSpinUnlock(&queue->spin);
43
44 return ret;
45 }
46
PlatformQueueWorker(void * data)47 static int32_t PlatformQueueWorker(void *data)
48 {
49 int32_t ret;
50 struct PlatformQueue *queue = (struct PlatformQueue *)data;
51 struct PlatformMsg *msg = NULL;
52
53 while (true) {
54 /* wait envent */
55 ret = OsalSemWait(&queue->sem, HDF_WAIT_FOREVER);
56 if (ret != HDF_SUCCESS) {
57 continue;
58 }
59
60 if (!queue->start) {
61 queue->exited = true;
62 break;
63 }
64
65 (void)PlatformQueueNextMsg(queue, &msg);
66 /* message process */
67 if (msg != NULL && queue->handle != NULL) {
68 (void)(queue->handle(queue, msg));
69 msg = NULL;
70 }
71 }
72 return HDF_SUCCESS;
73 }
74
PlatformQueueCreate(PlatformMsgHandle handle,const char * name,void * data)75 struct PlatformQueue *PlatformQueueCreate(PlatformMsgHandle handle, const char *name, void *data)
76 {
77 struct PlatformQueue *queue = NULL;
78
79 queue = (struct PlatformQueue *)OsalMemCalloc(sizeof(*queue));
80 if (queue == NULL) {
81 PLAT_LOGE("PlatformQueueCreate: alloc queue fail!");
82 return NULL;
83 }
84
85 (void)OsalSpinInit(&queue->spin);
86 (void)OsalSemInit(&queue->sem, 0);
87 DListHeadInit(&queue->msgs);
88 queue->name = (name == NULL) ? "PlatformQueue" : name;
89 queue->handle = handle;
90 queue->depth = 0;
91 queue->depthMax = PLAT_QUEUE_DEPTH_MAX;
92 queue->data = data;
93 queue->start = false;
94 queue->exited = false;
95 return queue;
96 }
97
PlatformQueueStart(struct PlatformQueue * queue)98 int32_t PlatformQueueStart(struct PlatformQueue *queue)
99 {
100 int32_t ret;
101 struct OsalThreadParam cfg;
102
103 if (queue == NULL) {
104 return HDF_ERR_INVALID_OBJECT;
105 }
106
107 ret = OsalThreadCreate(&queue->thread, (OsalThreadEntry)PlatformQueueWorker, (void *)queue);
108 (void)PlatformQueueWorker;
109 if (ret != HDF_SUCCESS) {
110 PLAT_LOGE("PlatformQueueStart: create thread fail!");
111 return ret;
112 }
113
114 cfg.name = (char *)queue->name;
115 cfg.priority = OSAL_THREAD_PRI_HIGHEST;
116 cfg.stackSize = PLAT_QUEUE_THREAD_STAK;
117 ret = OsalThreadStart(&queue->thread, &cfg);
118 (void)cfg;
119 if (ret != HDF_SUCCESS) {
120 OsalThreadDestroy(&queue->thread);
121 PLAT_LOGE("PlatformQueueStart: start thread fail:%d", ret);
122 return ret;
123 }
124 queue->start = true;
125
126 return HDF_SUCCESS;
127 }
128
PlatformQueueDestroy(struct PlatformQueue * queue)129 void PlatformQueueDestroy(struct PlatformQueue *queue)
130 {
131 if (queue == NULL) {
132 return;
133 }
134
135 if (queue->start) {
136 queue->start = false;
137 (void)OsalSemPost(&queue->sem);
138 while (!queue->exited) {
139 OsalMSleep(1);
140 }
141 PlatformQueueDoDestroy(queue);
142 } else {
143 PlatformQueueDoDestroy(queue);
144 }
145 }
146
PlatformQueueAddMsg(struct PlatformQueue * queue,struct PlatformMsg * msg)147 int32_t PlatformQueueAddMsg(struct PlatformQueue *queue, struct PlatformMsg *msg)
148 {
149 if (queue == NULL || msg == NULL) {
150 return HDF_ERR_INVALID_OBJECT;
151 }
152
153 DListHeadInit(&msg->node);
154 msg->error = HDF_SUCCESS;
155 (void)OsalSpinLock(&queue->spin);
156 if (queue->depth >= queue->depthMax) {
157 (void)OsalSpinUnlock(&queue->spin);
158 HDF_LOGE("PlatformQueueAddMsg: queue(%s) full!", queue->name);
159 return HDF_PLT_OUT_OF_RSC;
160 }
161 DListInsertTail(&msg->node, &queue->msgs);
162 queue->depth++;
163 (void)OsalSpinUnlock(&queue->spin);
164 /* notify the worker thread */
165 (void)OsalSemPost(&queue->sem);
166 return HDF_SUCCESS;
167 }
168
PlatformQueueGetMsg(struct PlatformQueue * queue,struct PlatformMsg ** msg,uint32_t tms)169 int32_t PlatformQueueGetMsg(struct PlatformQueue *queue, struct PlatformMsg **msg, uint32_t tms)
170 {
171 int32_t ret;
172
173 if (queue == NULL) {
174 return HDF_ERR_INVALID_OBJECT;
175 }
176 if (msg == NULL) {
177 return HDF_ERR_INVALID_PARAM;
178 }
179
180 ret = PlatformQueueNextMsg(queue, msg);
181 if (ret == HDF_SUCCESS) {
182 (void)OsalSemWait(&queue->sem, HDF_WAIT_FOREVER); // consume the semaphore after get
183 return ret;
184 }
185
186 if (tms == 0) {
187 return HDF_PLT_ERR_NO_DATA;
188 }
189
190 ret = OsalSemWait(&queue->sem, tms);
191 if (ret != HDF_SUCCESS) {
192 return ret;
193 }
194 return PlatformQueueNextMsg(queue, msg);
195 }
196
197