1 /******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include <string.h>
20
21 #include "osi/allocator.h"
22 #include "osi/fixed_queue.h"
23 #include "osi/semaphore.h"
24 #include "osi/thread.h"
25
26 struct osi_thread {
27 void *thread_handle; /*!< Store the thread object */
28 int thread_id; /*!< May for some OS, such as Linux */
29 bool stop;
30 uint8_t work_queue_num; /*!< Work queue number */
31 fixed_queue_t **work_queues; /*!< Point to queue array, and the priority inverse array index */
32 osi_sem_t work_sem;
33 osi_sem_t stop_sem;
34 };
35
36 struct osi_thread_start_arg {
37 osi_thread_t *thread;
38 osi_sem_t start_sem;
39 int error;
40 };
41
42 typedef struct {
43 osi_thread_func_t func;
44 void *context;
45 } work_item_t;
46
47 static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100;
48
osi_thread_run(void * arg)49 static void osi_thread_run(void *arg)
50 {
51 struct osi_thread_start_arg *start = (struct osi_thread_start_arg *)arg;
52 osi_thread_t *thread = start->thread;
53
54 osi_sem_give(&start->start_sem);
55
56 while (1) {
57 int idx = 0;
58
59 osi_sem_take(&thread->work_sem, OSI_SEM_MAX_TIMEOUT);
60
61 if (thread->stop) {
62 break;
63 }
64
65 while (!thread->stop && idx < thread->work_queue_num) {
66 work_item_t *item = fixed_queue_dequeue(thread->work_queues[idx], 0);
67 if (item) {
68 item->func(item->context);
69 osi_free(item);
70 idx = 0;
71 continue;
72 } else {
73 idx++;
74 }
75 }
76 }
77
78 thread->thread_handle = NULL;
79 osi_sem_give(&thread->stop_sem);
80
81 vTaskDelete(NULL);
82 }
83
osi_thread_join(osi_thread_t * thread,uint32_t wait_ms)84 static int osi_thread_join(osi_thread_t *thread, uint32_t wait_ms)
85 {
86 assert(thread != NULL);
87 return osi_sem_take(&thread->stop_sem, wait_ms);
88 }
89
osi_thread_stop(osi_thread_t * thread)90 static void osi_thread_stop(osi_thread_t *thread)
91 {
92 int ret;
93
94 assert(thread != NULL);
95
96 //stop the thread
97 thread->stop = true;
98 osi_sem_give(&thread->work_sem);
99
100 //join
101 ret = osi_thread_join(thread, 1000); //wait 1000ms
102
103 //if join failed, delete the task here
104 if (ret != 0 && thread->thread_handle) {
105 vTaskDelete(thread->thread_handle);
106 }
107 }
108
109 //in linux, the stack_size, priority and core may not be set here, the code will be ignore the arguments
osi_thread_create(const char * name,size_t stack_size,int priority,osi_thread_core_t core,uint8_t work_queue_num)110 osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priority, osi_thread_core_t core, uint8_t work_queue_num)
111 {
112 int ret;
113 osi_thread_t *thread;
114 struct osi_thread_start_arg start_arg = {0};
115
116 if (stack_size <= 0 ||
117 core < OSI_THREAD_CORE_0 || core > OSI_THREAD_CORE_AFFINITY ||
118 work_queue_num <= 0) {
119 return NULL;
120 }
121
122 thread = (osi_thread_t *)osi_malloc(sizeof(osi_thread_t));
123 if (thread == NULL) {
124 goto _err;
125 }
126
127 thread->stop = false;
128 thread->work_queue_num = work_queue_num;
129 thread->work_queues = (fixed_queue_t **)osi_malloc(sizeof(fixed_queue_t *) * work_queue_num);
130 if (thread->work_queues == NULL) {
131 goto _err;
132 }
133
134 for (int i = 0; i < thread->work_queue_num; i++) {
135 thread->work_queues[i] = fixed_queue_new(DEFAULT_WORK_QUEUE_CAPACITY);
136 if (thread->work_queues[i] == NULL) {
137 goto _err;
138 }
139 }
140
141 ret = osi_sem_new(&thread->work_sem, 1, 0);
142 if (ret != 0) {
143 goto _err;
144 }
145
146 ret = osi_sem_new(&thread->stop_sem, 1, 0);
147 if (ret != 0) {
148 goto _err;
149 }
150
151 start_arg.thread = thread;
152 ret = osi_sem_new(&start_arg.start_sem, 1, 0);
153 if (ret != 0) {
154 goto _err;
155 }
156
157 if (xTaskCreatePinnedToCore(osi_thread_run, name, stack_size, &start_arg, priority, &thread->thread_handle, core) != pdPASS) {
158 goto _err;
159 }
160
161 osi_sem_take(&start_arg.start_sem, OSI_SEM_MAX_TIMEOUT);
162 osi_sem_free(&start_arg.start_sem);
163
164 return thread;
165
166 _err:
167
168 if (thread) {
169 if (start_arg.start_sem) {
170 osi_sem_free(&start_arg.start_sem);
171 }
172
173 if (thread->thread_handle) {
174 vTaskDelete(thread->thread_handle);
175 }
176
177 for (int i = 0; i < thread->work_queue_num; i++) {
178 if (thread->work_queues[i]) {
179 fixed_queue_free(thread->work_queues[i], osi_free_func);
180 }
181 }
182
183 if (thread->work_queues) {
184 osi_free(thread->work_queues);
185 }
186
187 if (thread->work_sem) {
188 osi_sem_free(&thread->work_sem);
189 }
190
191 if (thread->stop_sem) {
192 osi_sem_free(&thread->stop_sem);
193 }
194
195 osi_free(thread);
196 }
197
198 return NULL;
199 }
200
osi_thread_free(osi_thread_t * thread)201 void osi_thread_free(osi_thread_t *thread)
202 {
203 if (!thread)
204 return;
205
206 osi_thread_stop(thread);
207
208 for (int i = 0; i < thread->work_queue_num; i++) {
209 if (thread->work_queues[i]) {
210 fixed_queue_free(thread->work_queues[i], osi_free_func);
211 }
212 }
213
214 if (thread->work_queues) {
215 osi_free(thread->work_queues);
216 }
217
218 if (thread->work_sem) {
219 osi_sem_free(&thread->work_sem);
220 }
221
222 if (thread->stop_sem) {
223 osi_sem_free(&thread->stop_sem);
224 }
225
226
227 osi_free(thread);
228 }
229
osi_thread_post(osi_thread_t * thread,osi_thread_func_t func,void * context,int queue_idx,uint32_t timeout)230 bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context, int queue_idx, uint32_t timeout)
231 {
232 assert(thread != NULL);
233 assert(func != NULL);
234
235 if (queue_idx >= thread->work_queue_num) {
236 return false;
237 }
238
239 work_item_t *item = (work_item_t *)osi_malloc(sizeof(work_item_t));
240 if (item == NULL) {
241 return false;
242 }
243 item->func = func;
244 item->context = context;
245
246 if (fixed_queue_enqueue(thread->work_queues[queue_idx], item, timeout) == false) {
247 osi_free(item);
248 return false;
249 }
250
251 osi_sem_give(&thread->work_sem);
252
253 return true;
254 }
255
osi_thread_set_priority(osi_thread_t * thread,int priority)256 bool osi_thread_set_priority(osi_thread_t *thread, int priority)
257 {
258 assert(thread != NULL);
259
260 vTaskPrioritySet(thread->thread_handle, priority);
261 return true;
262 }
263
osi_thread_name(osi_thread_t * thread)264 const char *osi_thread_name(osi_thread_t *thread)
265 {
266 assert(thread != NULL);
267
268 return pcTaskGetTaskName(thread->thread_handle);
269 }
270
osi_thread_queue_wait_size(osi_thread_t * thread,int wq_idx)271 int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx)
272 {
273 if (wq_idx < 0 || wq_idx >= thread->work_queue_num) {
274 return -1;
275 }
276
277 return fixed_queue_length(thread->work_queues[wq_idx]);
278 }
279