1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "platform/include/thread.h"
17 #include <pthread.h>
18 #include <sched.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <sys/prctl.h>
22 #include <sys/resource.h>
23 #include <sys/syscall.h>
24 #include "platform/include/mutex.h"
25 #include "platform/include/platform_def.h"
26 #include "platform/include/queue.h"
27 #include "platform/include/reactor.h"
28 #include "platform/include/semaphore.h"
29 #include "securec.h"
30
31 #define THREAD_QUEUE_SIZE 128
32 const char THREAD_DEFAULT_NAME[THREAD_NAME_SIZE] = "bt-stack";
33
34 typedef struct Thread {
35 pid_t tid;
36 bool isStopped;
37 pthread_t pthread;
38 Reactor *reactor;
39 Queue *taskQueue;
40 Mutex *apiMutex;
41 } ThreadInternal;
42
43 typedef struct {
44 Thread *thread;
45 Semaphore *sync;
46 char name[THREAD_NAME_SIZE + 1];
47 } StartPromise;
48
49 typedef struct {
50 void (*func)(void *context);
51 void *context;
52 } TaskItem;
53
ReadyToRead(void * queue)54 static void ReadyToRead(void *queue)
55 {
56 ASSERT(queue);
57 TaskItem *item = (TaskItem *)QueueDequeue((Queue *)queue);
58 if (item == NULL) {
59 LOG_ERROR("Thread: Queue Dequeue failed.");
60 return;
61 }
62 item->func(item->context);
63 free(item);
64 }
65
ThreadStartFunc(void * promise)66 static void *ThreadStartFunc(void *promise)
67 {
68 StartPromise *startPromise = (StartPromise *)promise;
69 Thread *thread = startPromise->thread;
70
71 thread->tid = (long int)syscall(__NR_gettid);
72 prctl(PR_SET_NAME, startPromise->name);
73 ReactorSetThreadId(thread->reactor, (unsigned long)pthread_self());
74
75 SemaphorePost(startPromise->sync);
76
77 int fd = QueueGetDequeueFd(thread->taskQueue);
78 ReactorItem *reactorItem = ReactorRegister(thread->reactor, fd, (void *)thread->taskQueue, ReadyToRead, NULL);
79
80 // Start Running reactor.
81 if (ReactorStart(thread->reactor) != 0) {
82 LOG_ERROR("ThreadStartFunc: Reactor run failed.");
83 }
84 ReactorUnregister(reactorItem);
85
86 int num = 0;
87 // Execute all remain tasks in queue after stop Reactor.
88 TaskItem *task = (TaskItem *)QueueTryDequeue(thread->taskQueue);
89 while (num <= THREAD_QUEUE_SIZE && task) {
90 task->func(task->context);
91 free(task);
92 task = (TaskItem *)QueueTryDequeue(thread->taskQueue);
93 num++;
94 }
95
96 return NULL;
97 }
98
ThreadIsSelf(const Thread * thread)99 int32_t ThreadIsSelf(const Thread *thread)
100 {
101 ASSERT(thread);
102 if (pthread_equal(thread->pthread, pthread_self()) != 0) {
103 return 0;
104 } else {
105 return -1;
106 }
107 }
108
ThreadStop(Thread * thread)109 static void ThreadStop(Thread *thread)
110 {
111 MutexLock(thread->apiMutex);
112 if (ThreadIsSelf((const Thread *)thread) == 0) {
113 LOG_ERROR("ThreadStop: Cannot stop thread by itself.");
114 }
115 if (!thread->isStopped) {
116 ReactorStop(thread->reactor);
117 pthread_join(thread->pthread, NULL);
118 thread->isStopped = true;
119 }
120 MutexUnlock(thread->apiMutex);
121 }
122
ThreadCreate(const char * name)123 Thread *ThreadCreate(const char *name)
124 {
125 Thread *thread = (Thread *)calloc(1, (sizeof(Thread)));
126 if (thread == NULL) {
127 return NULL;
128 }
129
130 thread->reactor = ReactorCreate();
131 if (thread->reactor == NULL) {
132 goto ERROR;
133 }
134
135 thread->taskQueue = QueueCreate(THREAD_QUEUE_SIZE);
136 if (thread->taskQueue == NULL) {
137 goto ERROR;
138 }
139
140 thread->apiMutex = MutexCreate();
141 if (thread->apiMutex == NULL) {
142 goto ERROR;
143 }
144
145 StartPromise *promise = (StartPromise *)calloc(1, sizeof(StartPromise));
146 promise->thread = thread;
147 promise->sync = SemaphoreCreate(0);
148 if (promise->sync == NULL) {
149 goto ERROR;
150 }
151
152 if (name != NULL) {
153 (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, name, THREAD_NAME_SIZE);
154 } else {
155 (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, THREAD_DEFAULT_NAME, THREAD_NAME_SIZE);
156 }
157
158 (void)pthread_create(&thread->pthread, NULL, ThreadStartFunc, promise);
159
160 SemaphoreWait(promise->sync);
161 SemaphoreDelete(promise->sync);
162 free(promise);
163
164 return thread;
165
166 ERROR:
167 if (thread != NULL) {
168 ReactorDelete(thread->reactor);
169 QueueDelete(thread->taskQueue, free);
170 MutexDelete(thread->apiMutex);
171 free(thread);
172 }
173 return NULL;
174 }
175
ThreadDelete(Thread * thread)176 void ThreadDelete(Thread *thread)
177 {
178 if (thread == NULL) {
179 return;
180 }
181
182 ThreadStop(thread);
183 MutexDelete(thread->apiMutex);
184 QueueDelete(thread->taskQueue, free);
185 ReactorDelete(thread->reactor);
186
187 free(thread);
188 }
189
ThreadPostTask(Thread * thread,TaskFunc func,void * context)190 void ThreadPostTask(Thread *thread, TaskFunc func, void *context)
191 {
192 ASSERT(thread);
193 ASSERT(func);
194
195 TaskItem *task = (TaskItem *)malloc(sizeof(TaskItem));
196 if (task == NULL) {
197 return;
198 }
199 task->func = func;
200 task->context = context;
201 QueueEnqueue(thread->taskQueue, task);
202 }
203
ThreadGetReactor(const Thread * thread)204 Reactor *ThreadGetReactor(const Thread *thread)
205 {
206 ASSERT(thread);
207 return thread->reactor;
208 }
209