1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "btm_thread.h"
17
18 #include <stddef.h>
19
20 #include "btstack.h"
21 #include "platform/include/allocator.h"
22 #include "platform/include/list.h"
23 #include "platform/include/mutex.h"
24 #include "platform/include/queue.h"
25 #include "platform/include/semaphore.h"
26 #include "platform/include/thread.h"
27
28 typedef struct {
29 void (*task)(void *context);
30 void *context;
31 } BtmTask;
32
33 typedef struct {
34 uint8_t id;
35 Queue *queue;
36 ReactorItem *reactorItem;
37 } BtmProcessingQueue;
38
39 static Thread *g_processingThread = NULL;
40 static List *g_processingQueueList = NULL;
41 static Mutex *g_processingQueueLock = NULL;
42
AllocTask(void (* task)(void * context),void * context)43 static BtmTask *AllocTask(void (*task)(void *context), void *context)
44 {
45 BtmTask *block = MEM_MALLOC.alloc(sizeof(BtmTask));
46 if (block != NULL) {
47 block->task = task;
48 block->context = context;
49 }
50 return block;
51 }
52
FreeTask(void * task)53 static void FreeTask(void *task)
54 {
55 MEM_MALLOC.free(task);
56 }
57
RunTask(void * context)58 static void RunTask(void *context)
59 {
60 BtmTask *task = QueueTryDequeue((Queue *)context);
61 if (task != NULL) {
62 task->task(task->context);
63
64 FreeTask(task);
65 }
66 }
67
AllocProcessingQueue(uint8_t id,uint32_t size)68 static BtmProcessingQueue *AllocProcessingQueue(uint8_t id, uint32_t size)
69 {
70 BtmProcessingQueue *block = MEM_MALLOC.alloc(sizeof(BtmProcessingQueue));
71 if (block != NULL) {
72 block->id = id;
73 block->queue = QueueCreate(size);
74 if (block->queue != NULL) {
75 Reactor *reactor = ThreadGetReactor(g_processingThread);
76 block->reactorItem = ReactorRegister(reactor, QueueGetDequeueFd(block->queue), block->queue, RunTask, NULL);
77 }
78 }
79 return block;
80 }
81
82 typedef struct {
83 Queue *queue;
84 Semaphore *semaphore;
85 } RunAllTaskContext;
86
RunAllTaskInQueueTask(void * param)87 static void RunAllTaskInQueueTask(void *param)
88 {
89 RunAllTaskContext *context = (RunAllTaskContext *)param;
90
91 BtmTask *task = QueueTryDequeue(context->queue);
92 while (task != NULL) {
93 task->task(task->context);
94
95 FreeTask(task);
96
97 task = QueueTryDequeue(context->queue);
98 }
99
100 if (context->semaphore != NULL) {
101 SemaphorePost(context->semaphore);
102 }
103 }
104
RunAllTaskInQueue(Queue * queue)105 static void RunAllTaskInQueue(Queue *queue)
106 {
107 RunAllTaskContext context = {
108 .queue = queue,
109 .semaphore = NULL,
110 };
111
112 if (ThreadIsSelf(g_processingThread) == 0) {
113 RunAllTaskInQueueTask(&context);
114 } else {
115 context.semaphore = SemaphoreCreate(0);
116 ThreadPostTask(g_processingThread, RunAllTaskInQueueTask, &context);
117 SemaphoreWait(context.semaphore);
118 SemaphoreDelete(context.semaphore);
119 }
120 }
121
FreeProcessingQueue(void * queue)122 static void FreeProcessingQueue(void *queue)
123 {
124 BtmProcessingQueue *block = (BtmProcessingQueue *)queue;
125 if (block->reactorItem != NULL) {
126 ReactorUnregister(block->reactorItem);
127 block->reactorItem = NULL;
128 }
129 if (block->queue != NULL) {
130 QueueDelete(block->queue, FreeTask);
131 block->queue = NULL;
132 }
133 MEM_MALLOC.free(queue);
134 }
135
FindProcessingQueueById(uint8_t queueId)136 static BtmProcessingQueue *FindProcessingQueueById(uint8_t queueId)
137 {
138 BtmProcessingQueue *queue = NULL;
139
140 ListNode *node = NULL;
141 node = ListGetFirstNode(g_processingQueueList);
142 while (node != NULL) {
143 queue = ListGetNodeData(node);
144 if (queue->id == queueId) {
145 break;
146 } else {
147 queue = NULL;
148 }
149 node = ListGetNextNode(node);
150 }
151
152 return queue;
153 }
154
BtmInitThread()155 void BtmInitThread()
156 {
157 g_processingThread = ThreadCreate("Stack");
158 g_processingQueueList = ListCreate(FreeProcessingQueue);
159 g_processingQueueLock = MutexCreate();
160 }
161
BtmCloseThread()162 void BtmCloseThread()
163 {
164 if (g_processingQueueList != NULL) {
165 ListDelete(g_processingQueueList);
166 g_processingQueueList = NULL;
167 }
168
169 if (g_processingThread != NULL) {
170 ThreadDelete(g_processingThread);
171 g_processingThread = NULL;
172 }
173
174 if (g_processingQueueLock != NULL) {
175 MutexDelete(g_processingQueueLock);
176 g_processingQueueLock = NULL;
177 }
178 }
179
BTM_GetProcessingThread()180 Thread *BTM_GetProcessingThread()
181 {
182 return g_processingThread;
183 }
184
BTM_CreateProcessingQueue(uint8_t queueId,uint16_t size)185 int BTM_CreateProcessingQueue(uint8_t queueId, uint16_t size)
186 {
187 int result = BT_NO_ERROR;
188 MutexLock(g_processingQueueLock);
189
190 BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
191 if (queue != NULL) {
192 result = BT_BAD_STATUS;
193 } else {
194 queue = AllocProcessingQueue(queueId, size);
195 ListAddLast(g_processingQueueList, queue);
196 }
197
198 MutexUnlock(g_processingQueueLock);
199 return result;
200 }
201
BTM_DeleteProcessingQueue(uint8_t queueId)202 int BTM_DeleteProcessingQueue(uint8_t queueId)
203 {
204 int result = BT_NO_ERROR;
205
206 Queue *taskQueue = NULL;
207
208 MutexLock(g_processingQueueLock);
209 BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
210 if (queue != NULL) {
211 taskQueue = queue->queue;
212 queue->queue = NULL;
213 ListRemoveNode(g_processingQueueList, queue);
214 } else {
215 result = BT_BAD_STATUS;
216 }
217 MutexUnlock(g_processingQueueLock);
218
219 if (taskQueue != NULL) {
220 RunAllTaskInQueue(taskQueue);
221 QueueDelete(taskQueue, FreeTask);
222 taskQueue = NULL;
223 }
224
225 return result;
226 }
227
BTM_RunTaskInProcessingQueue(uint8_t queueId,void (* task)(void * context),void * context)228 int BTM_RunTaskInProcessingQueue(uint8_t queueId, void (*task)(void *context), void *context)
229 {
230 int result = BT_NO_ERROR;
231 MutexLock(g_processingQueueLock);
232 BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
233 if (queue != NULL) {
234 BtmTask *block = AllocTask(task, context);
235 if (task != NULL) {
236 QueueEnqueue(queue->queue, block);
237 } else {
238 result = BT_NO_MEMORY;
239 }
240 } else {
241 result = BT_BAD_STATUS;
242 }
243 MutexUnlock(g_processingQueueLock);
244 return result;
245 }