1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "platform/include/thread.h"
17 #include <pthread.h>
18 #include <sched.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <sys/prctl.h>
22 #include <sys/resource.h>
23 #include <sys/syscall.h>
24 #include "platform/include/mutex.h"
25 #include "platform/include/platform_def.h"
26 #include "platform/include/queue.h"
27 #include "platform/include/reactor.h"
28 #include "platform/include/semaphore.h"
29 #include "securec.h"
30
31 #define THREAD_QUEUE_SIZE 128
32 const char THREAD_DEFAULT_NAME[THREAD_NAME_SIZE] = "bt-stack";
33
34 typedef struct Thread {
35 pid_t tid;
36 bool isStopped;
37 pthread_t pthread;
38 Reactor *reactor;
39 Queue *taskQueue;
40 Mutex *apiMutex;
41 } ThreadInternal;
42
43 typedef struct {
44 Thread *thread;
45 Semaphore *sync;
46 char name[THREAD_NAME_SIZE + 1];
47 } StartPromise;
48
49 typedef struct {
50 void (*func)(void *context);
51 void *context;
52 } TaskItem;
53
ReadyToRead(void * queue)54 static void ReadyToRead(void *queue)
55 {
56 ASSERT(queue);
57 TaskItem *item = (TaskItem *)QueueDequeue((Queue *)queue);
58 if (item == NULL) {
59 LOG_ERROR("Thread: Queue Dequeue failed.");
60 return;
61 }
62 item->func(item->context);
63 free(item);
64 }
65
ThreadStartFunc(void * promise)66 static void *ThreadStartFunc(void *promise)
67 {
68 StartPromise *startPromise = (StartPromise *)promise;
69 Thread *thread = startPromise->thread;
70
71 thread->tid = (long int)syscall(__NR_gettid);
72 prctl(PR_SET_NAME, startPromise->name);
73 ReactorSetThreadId(thread->reactor, (unsigned long)pthread_self());
74
75 SemaphorePost(startPromise->sync);
76
77 int fd = QueueGetDequeueFd(thread->taskQueue);
78 ReactorItem *reactorItem = ReactorRegister(thread->reactor, fd, (void *)thread->taskQueue, ReadyToRead, NULL);
79
80 // Start Running reactor.
81 if (ReactorStart(thread->reactor) != 0) {
82 LOG_ERROR("ThreadStartFunc: Reactor run failed.");
83 }
84 ReactorUnregister(reactorItem);
85
86 int num = 0;
87 // Execute all remain tasks in queue after stop Reactor.
88 TaskItem *task = (TaskItem *)QueueTryDequeue(thread->taskQueue);
89 while (num <= THREAD_QUEUE_SIZE && task) {
90 task->func(task->context);
91 free(task);
92 task = (TaskItem *)QueueTryDequeue(thread->taskQueue);
93 num++;
94 }
95
96 return NULL;
97 }
98
ThreadIsSelf(const Thread * thread)99 int32_t ThreadIsSelf(const Thread *thread)
100 {
101 ASSERT(thread);
102 if (pthread_equal(thread->pthread, pthread_self()) != 0) {
103 return 0;
104 } else {
105 return -1;
106 }
107 }
108
ThreadStop(Thread * thread)109 static void ThreadStop(Thread *thread)
110 {
111 MutexLock(thread->apiMutex);
112 if (ThreadIsSelf((const Thread *)thread) == 0) {
113 LOG_ERROR("ThreadStop: Cannot stop thread by itself.");
114 }
115 if (!thread->isStopped) {
116 ReactorStop(thread->reactor);
117 pthread_join(thread->pthread, NULL);
118 thread->isStopped = true;
119 }
120 MutexUnlock(thread->apiMutex);
121 }
122
ThreadCreate(const char * name)123 Thread *ThreadCreate(const char *name)
124 {
125 Thread *thread = (Thread *)calloc(1, (sizeof(Thread)));
126 if (thread == NULL) {
127 return NULL;
128 }
129
130 thread->reactor = ReactorCreate();
131 if (thread->reactor == NULL) {
132 goto ERROR;
133 }
134
135 thread->taskQueue = QueueCreate(THREAD_QUEUE_SIZE);
136 if (thread->taskQueue == NULL) {
137 goto ERROR;
138 }
139
140 thread->apiMutex = MutexCreate();
141 if (thread->apiMutex == NULL) {
142 goto ERROR;
143 }
144
145 StartPromise *promise = (StartPromise *)calloc(1, sizeof(StartPromise));
146 promise->thread = thread;
147 promise->sync = SemaphoreCreate(0);
148 if (promise->sync == NULL) {
149 free(promise);
150 goto ERROR;
151 }
152
153 if (name != NULL) {
154 (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, name, THREAD_NAME_SIZE);
155 } else {
156 (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, THREAD_DEFAULT_NAME, THREAD_NAME_SIZE);
157 }
158
159 (void)pthread_create(&thread->pthread, NULL, ThreadStartFunc, promise);
160
161 SemaphoreWait(promise->sync);
162 SemaphoreDelete(promise->sync);
163 free(promise);
164
165 return thread;
166
167 ERROR:
168 if (thread != NULL) {
169 ReactorDelete(thread->reactor);
170 QueueDelete(thread->taskQueue, free);
171 MutexDelete(thread->apiMutex);
172 free(thread);
173 }
174 return NULL;
175 }
176
ThreadDelete(Thread * thread)177 void ThreadDelete(Thread *thread)
178 {
179 if (thread == NULL) {
180 return;
181 }
182
183 ThreadStop(thread);
184 MutexDelete(thread->apiMutex);
185 QueueDelete(thread->taskQueue, free);
186 ReactorDelete(thread->reactor);
187
188 free(thread);
189 }
190
ThreadPostTask(Thread * thread,TaskFunc func,void * context)191 void ThreadPostTask(Thread *thread, TaskFunc func, void *context)
192 {
193 ASSERT(thread);
194 ASSERT(func);
195
196 TaskItem *task = (TaskItem *)malloc(sizeof(TaskItem));
197 if (task == NULL) {
198 return;
199 }
200 task->func = func;
201 task->context = context;
202 QueueEnqueue(thread->taskQueue, task);
203 }
204
ThreadGetReactor(const Thread * thread)205 Reactor *ThreadGetReactor(const Thread *thread)
206 {
207 ASSERT(thread);
208 return thread->reactor;
209 }
210