• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include <string.h>
20 
21 #include "osi/allocator.h"
22 #include "osi/fixed_queue.h"
23 #include "osi/semaphore.h"
24 #include "osi/thread.h"
25 
26 struct osi_thread {
27   void *thread_handle;                  /*!< Store the thread object */
28   int  thread_id;                       /*!< May for some OS, such as Linux */
29   bool stop;
30   uint8_t work_queue_num;               /*!< Work queue number */
31   fixed_queue_t **work_queues;          /*!< Point to queue array, and the priority inverse array index */
32   osi_sem_t work_sem;
33   osi_sem_t stop_sem;
34 };
35 
36 struct osi_thread_start_arg {
37   osi_thread_t *thread;
38   osi_sem_t start_sem;
39   int error;
40 };
41 
42 typedef struct {
43   osi_thread_func_t func;
44   void *context;
45 } work_item_t;
46 
47 static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100;
48 
osi_thread_run(void * arg)49 static void osi_thread_run(void *arg)
50 {
51     struct osi_thread_start_arg *start = (struct osi_thread_start_arg *)arg;
52     osi_thread_t *thread = start->thread;
53 
54     osi_sem_give(&start->start_sem);
55 
56     while (1) {
57         int idx = 0;
58 
59         osi_sem_take(&thread->work_sem, OSI_SEM_MAX_TIMEOUT);
60 
61         if (thread->stop) {
62             break;
63         }
64 
65         while (!thread->stop && idx < thread->work_queue_num) {
66             work_item_t *item = fixed_queue_dequeue(thread->work_queues[idx], 0);
67             if (item) {
68                 item->func(item->context);
69                 osi_free(item);
70                 idx = 0;
71                 continue;
72             } else {
73                 idx++;
74             }
75         }
76     }
77 
78     thread->thread_handle = NULL;
79     osi_sem_give(&thread->stop_sem);
80 
81     vTaskDelete(NULL);
82 }
83 
osi_thread_join(osi_thread_t * thread,uint32_t wait_ms)84 static int osi_thread_join(osi_thread_t *thread, uint32_t wait_ms)
85 {
86     assert(thread != NULL);
87     return osi_sem_take(&thread->stop_sem, wait_ms);
88 }
89 
osi_thread_stop(osi_thread_t * thread)90 static void osi_thread_stop(osi_thread_t *thread)
91 {
92     int ret;
93 
94     assert(thread != NULL);
95 
96     //stop the thread
97     thread->stop = true;
98     osi_sem_give(&thread->work_sem);
99 
100     //join
101     ret = osi_thread_join(thread, 1000); //wait 1000ms
102 
103     //if join failed, delete the task here
104     if (ret != 0 && thread->thread_handle) {
105         vTaskDelete(thread->thread_handle);
106     }
107 }
108 
109 //in linux, the stack_size, priority and core may not be set here, the code will be ignore the arguments
osi_thread_create(const char * name,size_t stack_size,int priority,osi_thread_core_t core,uint8_t work_queue_num)110 osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priority, osi_thread_core_t core, uint8_t work_queue_num)
111 {
112     int ret;
113     osi_thread_t *thread;
114     struct osi_thread_start_arg start_arg = {0};
115 
116     if (stack_size <= 0 ||
117             core < OSI_THREAD_CORE_0 || core > OSI_THREAD_CORE_AFFINITY ||
118             work_queue_num <= 0) {
119         return NULL;
120     }
121 
122     thread = (osi_thread_t *)osi_malloc(sizeof(osi_thread_t));
123     if (thread == NULL) {
124         goto _err;
125     }
126 
127     thread->stop = false;
128     thread->work_queue_num = work_queue_num;
129     thread->work_queues = (fixed_queue_t **)osi_malloc(sizeof(fixed_queue_t *) * work_queue_num);
130     if (thread->work_queues == NULL) {
131         goto _err;
132     }
133 
134     for (int i = 0; i < thread->work_queue_num; i++) {
135         thread->work_queues[i] = fixed_queue_new(DEFAULT_WORK_QUEUE_CAPACITY);
136         if (thread->work_queues[i] == NULL) {
137             goto _err;
138         }
139     }
140 
141     ret = osi_sem_new(&thread->work_sem, 1, 0);
142     if (ret != 0) {
143         goto _err;
144     }
145 
146     ret = osi_sem_new(&thread->stop_sem, 1, 0);
147     if (ret != 0) {
148         goto _err;
149     }
150 
151     start_arg.thread = thread;
152     ret = osi_sem_new(&start_arg.start_sem, 1, 0);
153     if (ret != 0) {
154         goto _err;
155     }
156 
157     if (xTaskCreatePinnedToCore(osi_thread_run, name, stack_size, &start_arg, priority, &thread->thread_handle, core) != pdPASS) {
158         goto _err;
159     }
160 
161     osi_sem_take(&start_arg.start_sem, OSI_SEM_MAX_TIMEOUT);
162     osi_sem_free(&start_arg.start_sem);
163 
164     return thread;
165 
166 _err:
167 
168     if (thread) {
169         if (start_arg.start_sem) {
170             osi_sem_free(&start_arg.start_sem);
171         }
172 
173         if (thread->thread_handle) {
174             vTaskDelete(thread->thread_handle);
175         }
176 
177         for (int i = 0; i < thread->work_queue_num; i++) {
178             if (thread->work_queues[i]) {
179                 fixed_queue_free(thread->work_queues[i], osi_free_func);
180             }
181         }
182 
183         if (thread->work_queues) {
184             osi_free(thread->work_queues);
185         }
186 
187         if (thread->work_sem) {
188             osi_sem_free(&thread->work_sem);
189         }
190 
191         if (thread->stop_sem) {
192             osi_sem_free(&thread->stop_sem);
193         }
194 
195         osi_free(thread);
196     }
197 
198     return NULL;
199 }
200 
osi_thread_free(osi_thread_t * thread)201 void osi_thread_free(osi_thread_t *thread)
202 {
203     if (!thread)
204         return;
205 
206     osi_thread_stop(thread);
207 
208     for (int i = 0; i < thread->work_queue_num; i++) {
209         if (thread->work_queues[i]) {
210             fixed_queue_free(thread->work_queues[i], osi_free_func);
211         }
212     }
213 
214     if (thread->work_queues) {
215         osi_free(thread->work_queues);
216     }
217 
218     if (thread->work_sem) {
219         osi_sem_free(&thread->work_sem);
220     }
221 
222     if (thread->stop_sem) {
223         osi_sem_free(&thread->stop_sem);
224     }
225 
226 
227     osi_free(thread);
228 }
229 
osi_thread_post(osi_thread_t * thread,osi_thread_func_t func,void * context,int queue_idx,uint32_t timeout)230 bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context, int queue_idx, uint32_t timeout)
231 {
232     assert(thread != NULL);
233     assert(func != NULL);
234 
235     if (queue_idx >= thread->work_queue_num) {
236         return false;
237     }
238 
239     work_item_t *item = (work_item_t *)osi_malloc(sizeof(work_item_t));
240     if (item == NULL) {
241         return false;
242     }
243     item->func = func;
244     item->context = context;
245 
246     if (fixed_queue_enqueue(thread->work_queues[queue_idx], item, timeout) == false) {
247         osi_free(item);
248         return false;
249     }
250 
251     osi_sem_give(&thread->work_sem);
252 
253     return true;
254 }
255 
osi_thread_set_priority(osi_thread_t * thread,int priority)256 bool osi_thread_set_priority(osi_thread_t *thread, int priority)
257 {
258     assert(thread != NULL);
259 
260     vTaskPrioritySet(thread->thread_handle, priority);
261     return true;
262 }
263 
osi_thread_name(osi_thread_t * thread)264 const char *osi_thread_name(osi_thread_t *thread)
265 {
266     assert(thread != NULL);
267 
268     return pcTaskGetTaskName(thread->thread_handle);
269 }
270 
osi_thread_queue_wait_size(osi_thread_t * thread,int wq_idx)271 int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx)
272 {
273     if (wq_idx < 0 || wq_idx >= thread->work_queue_num) {
274         return -1;
275     }
276 
277     return fixed_queue_length(thread->work_queues[wq_idx]);
278 }
279