• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #define LOG_TAG "osi_thread"
20 
21 #include <assert.h>
22 #include <errno.h>
23 #include <pthread.h>
24 #include <string.h>
25 #include <sys/prctl.h>
26 #include <sys/types.h>
27 #include <utils/Log.h>
28 
29 #include "fixed_queue.h"
30 #include "reactor.h"
31 #include "semaphore.h"
32 #include "thread.h"
33 
34 struct thread_t {
35   pthread_t pthread;
36   pid_t tid;
37   char name[THREAD_NAME_MAX + 1];
38   reactor_t *reactor;
39   fixed_queue_t *work_queue;
40 };
41 
42 struct start_arg {
43   thread_t *thread;
44   semaphore_t *start_sem;
45   int error;
46 };
47 
48 typedef struct {
49   thread_fn func;
50   void *context;
51 } work_item_t;
52 
53 static void *run_thread(void *start_arg);
54 static void work_queue_read_cb(void *context);
55 
56 static const size_t WORK_QUEUE_CAPACITY = 128;
57 
thread_new(const char * name)58 thread_t *thread_new(const char *name) {
59   assert(name != NULL);
60 
61   // Start is on the stack, but we use a semaphore, so it's safe
62   thread_t *ret = calloc(1, sizeof(thread_t));
63   if (!ret)
64     goto error;
65 
66   ret->reactor = reactor_new();
67   if (!ret->reactor)
68     goto error;
69 
70   ret->work_queue = fixed_queue_new(WORK_QUEUE_CAPACITY);
71   if (!ret->work_queue)
72     goto error;
73 
74   struct start_arg start;
75   start.start_sem = semaphore_new(0);
76   if (!start.start_sem)
77     goto error;
78 
79   strncpy(ret->name, name, THREAD_NAME_MAX);
80   start.thread = ret;
81   start.error = 0;
82   pthread_create(&ret->pthread, NULL, run_thread, &start);
83   semaphore_wait(start.start_sem);
84   semaphore_free(start.start_sem);
85   if (start.error)
86     goto error;
87   return ret;
88 
89 error:;
90   if (ret) {
91     fixed_queue_free(ret->work_queue, free);
92     reactor_free(ret->reactor);
93   }
94   free(ret);
95   return NULL;
96 }
97 
thread_free(thread_t * thread)98 void thread_free(thread_t *thread) {
99   if (!thread)
100     return;
101 
102   thread_stop(thread);
103   pthread_join(thread->pthread, NULL);
104   fixed_queue_free(thread->work_queue, free);
105   reactor_free(thread->reactor);
106   free(thread);
107 }
108 
thread_post(thread_t * thread,thread_fn func,void * context)109 bool thread_post(thread_t *thread, thread_fn func, void *context) {
110   assert(thread != NULL);
111   assert(func != NULL);
112 
113   // TODO(sharvil): if the current thread == |thread| and we've run out
114   // of queue space, we should abort this operation, otherwise we'll
115   // deadlock.
116 
117   // Queue item is freed either when the queue itself is destroyed
118   // or when the item is removed from the queue for dispatch.
119   work_item_t *item = (work_item_t *)malloc(sizeof(work_item_t));
120   if (!item) {
121     ALOGE("%s unable to allocate memory: %s", __func__, strerror(errno));
122     return false;
123   }
124   item->func = func;
125   item->context = context;
126   fixed_queue_enqueue(thread->work_queue, item);
127   return true;
128 }
129 
thread_stop(thread_t * thread)130 void thread_stop(thread_t *thread) {
131   assert(thread != NULL);
132   reactor_stop(thread->reactor);
133 }
134 
thread_name(const thread_t * thread)135 const char *thread_name(const thread_t *thread) {
136   assert(thread != NULL);
137   return thread->name;
138 }
139 
run_thread(void * start_arg)140 static void *run_thread(void *start_arg) {
141   assert(start_arg != NULL);
142 
143   struct start_arg *start = start_arg;
144   thread_t *thread = start->thread;
145 
146   assert(thread != NULL);
147 
148   if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
149     ALOGE("%s unable to set thread name: %s", __func__, strerror(errno));
150     start->error = errno;
151     semaphore_post(start->start_sem);
152     return NULL;
153   }
154   thread->tid = gettid();
155 
156   semaphore_post(start->start_sem);
157 
158   reactor_object_t work_queue_object;
159   work_queue_object.context = thread->work_queue;
160   work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue);
161   work_queue_object.interest = REACTOR_INTEREST_READ;
162   work_queue_object.read_ready = work_queue_read_cb;
163 
164   reactor_register(thread->reactor, &work_queue_object);
165   reactor_start(thread->reactor);
166 
167   // Make sure we dispatch all queued work items before exiting the thread.
168   // This allows a caller to safely tear down by enqueuing a teardown
169   // work item and then joining the thread.
170   size_t count = 0;
171   work_item_t *item = fixed_queue_try_dequeue(thread->work_queue);
172   while (item && count <= WORK_QUEUE_CAPACITY) {
173     item->func(item->context);
174     free(item);
175     item = fixed_queue_try_dequeue(thread->work_queue);
176     ++count;
177   }
178 
179   if (count > WORK_QUEUE_CAPACITY)
180     ALOGD("%s growing event queue on shutdown.", __func__);
181 
182   return NULL;
183 }
184 
work_queue_read_cb(void * context)185 static void work_queue_read_cb(void *context) {
186   assert(context != NULL);
187 
188   fixed_queue_t *queue = (fixed_queue_t *)context;
189   work_item_t *item = fixed_queue_dequeue(queue);
190   item->func(item->context);
191   free(item);
192 }
193