• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #define LOG_TAG "bt_osi_reactor"
20 
21 #include <assert.h>
22 #include <errno.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/epoll.h>
27 #include <sys/eventfd.h>
28 
29 #include "osi/include/allocator.h"
30 #include "osi/include/list.h"
31 #include "osi/include/log.h"
32 #include "osi/include/reactor.h"
33 
34 #if !defined(EFD_SEMAPHORE)
35 #  define EFD_SEMAPHORE (1 << 0)
36 #endif
37 
38 struct reactor_t {
39   int epoll_fd;
40   int event_fd;
41   pthread_mutex_t list_lock;  // protects invalidation_list.
42   list_t *invalidation_list;  // reactor objects that have been unregistered.
43   pthread_t run_thread;       // the pthread on which reactor_run is executing.
44   bool is_running;            // indicates whether |run_thread| is valid.
45   bool object_removed;
46 };
47 
48 struct reactor_object_t {
49   int fd;                              // the file descriptor to monitor for events.
50   void *context;                       // a context that's passed back to the *_ready functions.
51   reactor_t *reactor;                  // the reactor instance this object is registered with.
52   pthread_mutex_t lock;                // protects the lifetime of this object and all variables.
53 
54   void (*read_ready)(void *context);   // function to call when the file descriptor becomes readable.
55   void (*write_ready)(void *context);  // function to call when the file descriptor becomes writeable.
56 };
57 
58 static reactor_status_t run_reactor(reactor_t *reactor, int iterations);
59 
60 static const size_t MAX_EVENTS = 64;
61 static const eventfd_t EVENT_REACTOR_STOP = 1;
62 
reactor_new(void)63 reactor_t *reactor_new(void) {
64   reactor_t *ret = (reactor_t *)osi_calloc(sizeof(reactor_t));
65   if (!ret)
66     return NULL;
67 
68   ret->epoll_fd = INVALID_FD;
69   ret->event_fd = INVALID_FD;
70 
71   ret->epoll_fd = epoll_create(MAX_EVENTS);
72   if (ret->epoll_fd == INVALID_FD) {
73     LOG_ERROR("%s unable to create epoll instance: %s", __func__, strerror(errno));
74     goto error;
75   }
76 
77   ret->event_fd = eventfd(0, 0);
78   if (ret->event_fd == INVALID_FD) {
79     LOG_ERROR("%s unable to create eventfd: %s", __func__, strerror(errno));
80     goto error;
81   }
82 
83   pthread_mutex_init(&ret->list_lock, NULL);
84   ret->invalidation_list = list_new(NULL);
85   if (!ret->invalidation_list) {
86     LOG_ERROR("%s unable to allocate object invalidation list.", __func__);
87     goto error;
88   }
89 
90   struct epoll_event event;
91   memset(&event, 0, sizeof(event));
92   event.events = EPOLLIN;
93   event.data.ptr = NULL;
94   if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
95     LOG_ERROR("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno));
96     goto error;
97   }
98 
99   return ret;
100 
101 error:;
102   reactor_free(ret);
103   return NULL;
104 }
105 
reactor_free(reactor_t * reactor)106 void reactor_free(reactor_t *reactor) {
107   if (!reactor)
108     return;
109 
110   list_free(reactor->invalidation_list);
111   close(reactor->event_fd);
112   close(reactor->epoll_fd);
113   osi_free(reactor);
114 }
115 
reactor_start(reactor_t * reactor)116 reactor_status_t reactor_start(reactor_t *reactor) {
117   assert(reactor != NULL);
118   return run_reactor(reactor, 0);
119 }
120 
reactor_run_once(reactor_t * reactor)121 reactor_status_t reactor_run_once(reactor_t *reactor) {
122   assert(reactor != NULL);
123   return run_reactor(reactor, 1);
124 }
125 
reactor_stop(reactor_t * reactor)126 void reactor_stop(reactor_t *reactor) {
127   assert(reactor != NULL);
128 
129   eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
130 }
131 
reactor_register(reactor_t * reactor,int fd,void * context,void (* read_ready)(void * context),void (* write_ready)(void * context))132 reactor_object_t *reactor_register(reactor_t *reactor,
133     int fd, void *context,
134     void (*read_ready)(void *context),
135     void (*write_ready)(void *context)) {
136   assert(reactor != NULL);
137   assert(fd != INVALID_FD);
138 
139   reactor_object_t *object = (reactor_object_t *)osi_calloc(sizeof(reactor_object_t));
140   if (!object) {
141     LOG_ERROR("%s unable to allocate reactor object: %s", __func__, strerror(errno));
142     return NULL;
143   }
144 
145   object->reactor = reactor;
146   object->fd = fd;
147   object->context = context;
148   object->read_ready = read_ready;
149   object->write_ready = write_ready;
150   pthread_mutex_init(&object->lock, NULL);
151 
152   struct epoll_event event;
153   memset(&event, 0, sizeof(event));
154   if (read_ready)
155     event.events |= (EPOLLIN | EPOLLRDHUP);
156   if (write_ready)
157     event.events |= EPOLLOUT;
158   event.data.ptr = object;
159 
160   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
161     LOG_ERROR("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));
162     pthread_mutex_destroy(&object->lock);
163     osi_free(object);
164     return NULL;
165   }
166 
167   return object;
168 }
169 
reactor_change_registration(reactor_object_t * object,void (* read_ready)(void * context),void (* write_ready)(void * context))170 bool reactor_change_registration(reactor_object_t *object,
171     void (*read_ready)(void *context),
172     void (*write_ready)(void *context)) {
173   assert(object != NULL);
174 
175   struct epoll_event event;
176   memset(&event, 0, sizeof(event));
177   if (read_ready)
178     event.events |= (EPOLLIN | EPOLLRDHUP);
179   if (write_ready)
180     event.events |= EPOLLOUT;
181   event.data.ptr = object;
182 
183   if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {
184     LOG_ERROR("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno));
185     return false;
186   }
187 
188   pthread_mutex_lock(&object->lock);
189   object->read_ready = read_ready;
190   object->write_ready = write_ready;
191   pthread_mutex_unlock(&object->lock);
192 
193   return true;
194 }
195 
reactor_unregister(reactor_object_t * obj)196 void reactor_unregister(reactor_object_t *obj) {
197   assert(obj != NULL);
198 
199   reactor_t *reactor = obj->reactor;
200 
201   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
202     LOG_ERROR("%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno));
203 
204   if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) {
205     reactor->object_removed = true;
206     return;
207   }
208 
209   pthread_mutex_lock(&reactor->list_lock);
210   list_append(reactor->invalidation_list, obj);
211   pthread_mutex_unlock(&reactor->list_lock);
212 
213   // Taking the object lock here makes sure a callback for |obj| isn't
214   // currently executing. The reactor thread must then either be before
215   // the callbacks or after. If after, we know that the object won't be
216   // referenced because it has been taken out of the epoll set. If before,
217   // it won't be referenced because the reactor thread will check the
218   // invalidation_list and find it in there. So by taking this lock, we
219   // are waiting until the reactor thread drops all references to |obj|.
220   // One the wait completes, we can unlock and destroy |obj| safely.
221   pthread_mutex_lock(&obj->lock);
222   pthread_mutex_unlock(&obj->lock);
223   pthread_mutex_destroy(&obj->lock);
224   osi_free(obj);
225 }
226 
227 // Runs the reactor loop for a maximum of |iterations|.
228 // 0 |iterations| means loop forever.
229 // |reactor| may not be NULL.
run_reactor(reactor_t * reactor,int iterations)230 static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
231   assert(reactor != NULL);
232 
233   reactor->run_thread = pthread_self();
234   reactor->is_running = true;
235 
236   struct epoll_event events[MAX_EVENTS];
237   for (int i = 0; iterations == 0 || i < iterations; ++i) {
238     pthread_mutex_lock(&reactor->list_lock);
239     list_clear(reactor->invalidation_list);
240     pthread_mutex_unlock(&reactor->list_lock);
241 
242     int ret;
243     do {
244       ret = TEMP_FAILURE_RETRY(epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
245     } while (ret == -1 && errno == EINTR);
246 
247     if (ret == -1) {
248       LOG_ERROR("%s error in epoll_wait: %s", __func__, strerror(errno));
249       reactor->is_running = false;
250       return REACTOR_STATUS_ERROR;
251     }
252 
253     for (int j = 0; j < ret; ++j) {
254       // The event file descriptor is the only one that registers with
255       // a NULL data pointer. We use the NULL to identify it and break
256       // out of the reactor loop.
257       if (events[j].data.ptr == NULL) {
258         eventfd_t value;
259         eventfd_read(reactor->event_fd, &value);
260         reactor->is_running = false;
261         return REACTOR_STATUS_STOP;
262       }
263 
264       reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;
265 
266       pthread_mutex_lock(&reactor->list_lock);
267       if (list_contains(reactor->invalidation_list, object)) {
268         pthread_mutex_unlock(&reactor->list_lock);
269         continue;
270       }
271 
272       // Downgrade the list lock to an object lock.
273       pthread_mutex_lock(&object->lock);
274       pthread_mutex_unlock(&reactor->list_lock);
275 
276       reactor->object_removed = false;
277       if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
278         object->read_ready(object->context);
279       if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
280         object->write_ready(object->context);
281       pthread_mutex_unlock(&object->lock);
282 
283       if (reactor->object_removed) {
284         pthread_mutex_destroy(&object->lock);
285         osi_free(object);
286       }
287     }
288   }
289 
290   reactor->is_running = false;
291   return REACTOR_STATUS_DONE;
292 }
293