• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "platform/include/thread.h"
17 #include <pthread.h>
18 #include <sched.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <sys/prctl.h>
22 #include <sys/resource.h>
23 #include <sys/syscall.h>
24 #include "platform/include/mutex.h"
25 #include "platform/include/platform_def.h"
26 #include "platform/include/queue.h"
27 #include "platform/include/reactor.h"
28 #include "platform/include/semaphore.h"
29 #include "securec.h"
30 
31 #define THREAD_QUEUE_SIZE 128
32 const char THREAD_DEFAULT_NAME[THREAD_NAME_SIZE] = "bt-stack";
33 
34 typedef struct Thread {
35     pid_t tid;
36     bool isStopped;
37     pthread_t pthread;
38     Reactor *reactor;
39     Queue *taskQueue;
40     Mutex *apiMutex;
41 } ThreadInternal;
42 
43 typedef struct {
44     Thread *thread;
45     Semaphore *sync;
46     char name[THREAD_NAME_SIZE + 1];
47 } StartPromise;
48 
49 typedef struct {
50     void (*func)(void *context);
51     void *context;
52 } TaskItem;
53 
ReadyToRead(void * queue)54 static void ReadyToRead(void *queue)
55 {
56     ASSERT(queue);
57     TaskItem *item = (TaskItem *)QueueDequeue((Queue *)queue);
58     if (item == NULL) {
59         LOG_ERROR("Thread: Queue Dequeue failed.");
60         return;
61     }
62     item->func(item->context);
63     free(item);
64 }
65 
ThreadStartFunc(void * promise)66 static void *ThreadStartFunc(void *promise)
67 {
68     StartPromise *startPromise = (StartPromise *)promise;
69     Thread *thread = startPromise->thread;
70 
71     thread->tid = (long int)syscall(__NR_gettid);
72     prctl(PR_SET_NAME, startPromise->name);
73     ReactorSetThreadId(thread->reactor, (unsigned long)pthread_self());
74 
75     SemaphorePost(startPromise->sync);
76 
77     int fd = QueueGetDequeueFd(thread->taskQueue);
78     ReactorItem *reactorItem = ReactorRegister(thread->reactor, fd, (void *)thread->taskQueue, ReadyToRead, NULL);
79 
80     // Start Running reactor.
81     if (ReactorStart(thread->reactor) != 0) {
82         LOG_ERROR("ThreadStartFunc: Reactor run failed.");
83     }
84     ReactorUnregister(reactorItem);
85 
86     int num = 0;
87     // Execute all remain tasks in queue after stop Reactor.
88     TaskItem *task = (TaskItem *)QueueTryDequeue(thread->taskQueue);
89     while (num <= THREAD_QUEUE_SIZE && task) {
90         task->func(task->context);
91         free(task);
92         task = (TaskItem *)QueueTryDequeue(thread->taskQueue);
93         num++;
94     }
95 
96     return NULL;
97 }
98 
ThreadIsSelf(const Thread * thread)99 int32_t ThreadIsSelf(const Thread *thread)
100 {
101     ASSERT(thread);
102     if (pthread_equal(thread->pthread, pthread_self()) != 0) {
103         return 0;
104     } else {
105         return -1;
106     }
107 }
108 
ThreadStop(Thread * thread)109 static void ThreadStop(Thread *thread)
110 {
111     MutexLock(thread->apiMutex);
112     if (ThreadIsSelf((const Thread *)thread) == 0) {
113         LOG_ERROR("ThreadStop: Cannot stop thread by itself.");
114     }
115     if (!thread->isStopped) {
116         ReactorStop(thread->reactor);
117         pthread_join(thread->pthread, NULL);
118         thread->isStopped = true;
119     }
120     MutexUnlock(thread->apiMutex);
121 }
122 
ThreadCreate(const char * name)123 Thread *ThreadCreate(const char *name)
124 {
125     Thread *thread = (Thread *)calloc(1, (sizeof(Thread)));
126     if (thread == NULL) {
127         return NULL;
128     }
129 
130     thread->reactor = ReactorCreate();
131     if (thread->reactor == NULL) {
132         goto ERROR;
133     }
134 
135     thread->taskQueue = QueueCreate(THREAD_QUEUE_SIZE);
136     if (thread->taskQueue == NULL) {
137         goto ERROR;
138     }
139 
140     thread->apiMutex = MutexCreate();
141     if (thread->apiMutex == NULL) {
142         goto ERROR;
143     }
144 
145     StartPromise *promise = (StartPromise *)calloc(1, sizeof(StartPromise));
146     promise->thread = thread;
147     promise->sync = SemaphoreCreate(0);
148     if (promise->sync == NULL) {
149         free(promise);
150         goto ERROR;
151     }
152 
153     if (name != NULL) {
154         (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, name, THREAD_NAME_SIZE);
155     } else {
156         (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, THREAD_DEFAULT_NAME, THREAD_NAME_SIZE);
157     }
158 
159     (void)pthread_create(&thread->pthread, NULL, ThreadStartFunc, promise);
160 
161     SemaphoreWait(promise->sync);
162     SemaphoreDelete(promise->sync);
163     free(promise);
164 
165     return thread;
166 
167 ERROR:
168     if (thread != NULL) {
169         ReactorDelete(thread->reactor);
170         QueueDelete(thread->taskQueue, free);
171         MutexDelete(thread->apiMutex);
172         free(thread);
173     }
174     return NULL;
175 }
176 
ThreadDelete(Thread * thread)177 void ThreadDelete(Thread *thread)
178 {
179     if (thread == NULL) {
180         return;
181     }
182 
183     ThreadStop(thread);
184     MutexDelete(thread->apiMutex);
185     QueueDelete(thread->taskQueue, free);
186     ReactorDelete(thread->reactor);
187 
188     free(thread);
189 }
190 
ThreadPostTask(Thread * thread,TaskFunc func,void * context)191 void ThreadPostTask(Thread *thread, TaskFunc func, void *context)
192 {
193     ASSERT(thread);
194     ASSERT(func);
195 
196     TaskItem *task = (TaskItem *)malloc(sizeof(TaskItem));
197     if (task == NULL) {
198         return;
199     }
200     task->func = func;
201     task->context = context;
202     QueueEnqueue(thread->taskQueue, task);
203 }
204 
ThreadGetReactor(const Thread * thread)205 Reactor *ThreadGetReactor(const Thread *thread)
206 {
207     ASSERT(thread);
208     return thread->reactor;
209 }
210