• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #define LOG_TAG "bt_osi_eager_reader"
20 
21 #include <assert.h>
22 #include <errno.h>
23 #include <stddef.h>
24 #include <string.h>
25 #include <sys/eventfd.h>
26 
27 #include "osi/include/allocator.h"
28 #include "osi/include/eager_reader.h"
29 #include "osi/include/fixed_queue.h"
30 #include "osi/include/osi.h"
31 #include "osi/include/log.h"
32 #include "osi/include/reactor.h"
33 
34 #if !defined(EFD_SEMAPHORE)
35 #  define EFD_SEMAPHORE (1 << 0)
36 #endif
37 
38 typedef struct {
39   size_t length;
40   size_t offset;
41   uint8_t data[];
42 } data_buffer_t;
43 
44 struct eager_reader_t {
45   int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes
46   int inbound_fd;
47 
48   const allocator_t *allocator;
49   size_t buffer_size;
50   fixed_queue_t *buffers;
51   data_buffer_t *current_buffer;
52 
53   thread_t *inbound_read_thread;
54   reactor_object_t *inbound_read_object;
55 
56   reactor_object_t *outbound_registration;
57   eager_reader_cb outbound_read_ready;
58   void *outbound_context;
59 };
60 
61 static bool has_byte(const eager_reader_t *reader);
62 static void inbound_data_waiting(void *context);
63 static void internal_outbound_read_ready(void *context);
64 
eager_reader_new(int fd_to_read,const allocator_t * allocator,size_t buffer_size,size_t max_buffer_count,const char * thread_name)65 eager_reader_t *eager_reader_new(
66     int fd_to_read,
67     const allocator_t *allocator,
68     size_t buffer_size,
69     size_t max_buffer_count,
70     const char *thread_name) {
71 
72   assert(fd_to_read != INVALID_FD);
73   assert(allocator != NULL);
74   assert(buffer_size > 0);
75   assert(max_buffer_count > 0);
76   assert(thread_name != NULL && *thread_name != '\0');
77 
78   eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t));
79   if (!ret) {
80     LOG_ERROR("%s unable to allocate memory for new eager_reader.", __func__);
81     goto error;
82   }
83 
84   ret->allocator = allocator;
85   ret->inbound_fd = fd_to_read;
86 
87   ret->bytes_available_fd = eventfd(0, 0);
88   if (ret->bytes_available_fd == INVALID_FD) {
89     LOG_ERROR("%s unable to create output reading semaphore.", __func__);
90     goto error;
91   }
92 
93   ret->buffer_size = buffer_size;
94 
95   ret->buffers = fixed_queue_new(max_buffer_count);
96   if (!ret->buffers) {
97     LOG_ERROR("%s unable to create buffers queue.", __func__);
98     goto error;
99   }
100 
101   ret->inbound_read_thread = thread_new(thread_name);
102   if (!ret->inbound_read_thread) {
103     LOG_ERROR("%s unable to make reading thread.", __func__);
104     goto error;
105   }
106 
107   ret->inbound_read_object = reactor_register(
108     thread_get_reactor(ret->inbound_read_thread),
109     fd_to_read,
110     ret,
111     inbound_data_waiting,
112     NULL
113   );
114 
115   return ret;
116 
117 error:;
118   eager_reader_free(ret);
119   return NULL;
120 }
121 
eager_reader_free(eager_reader_t * reader)122 void eager_reader_free(eager_reader_t *reader) {
123   if (!reader)
124     return;
125 
126   eager_reader_unregister(reader);
127 
128   // Only unregister from the input if we actually did register
129   if (reader->inbound_read_object)
130     reactor_unregister(reader->inbound_read_object);
131 
132   if (reader->bytes_available_fd != INVALID_FD)
133     close(reader->bytes_available_fd);
134 
135   // Free the current buffer, because it's not in the queue
136   // and won't be freed below
137   if (reader->current_buffer)
138     reader->allocator->free(reader->current_buffer);
139 
140   fixed_queue_free(reader->buffers, reader->allocator->free);
141   thread_free(reader->inbound_read_thread);
142   osi_free(reader);
143 }
144 
eager_reader_register(eager_reader_t * reader,reactor_t * reactor,eager_reader_cb read_cb,void * context)145 void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
146   assert(reader != NULL);
147   assert(reactor != NULL);
148   assert(read_cb != NULL);
149 
150   // Make sure the reader isn't currently registered.
151   eager_reader_unregister(reader);
152 
153   reader->outbound_read_ready = read_cb;
154   reader->outbound_context = context;
155   reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
156 }
157 
eager_reader_unregister(eager_reader_t * reader)158 void eager_reader_unregister(eager_reader_t *reader) {
159   assert(reader != NULL);
160 
161   if (reader->outbound_registration) {
162     reactor_unregister(reader->outbound_registration);
163     reader->outbound_registration = NULL;
164   }
165 }
166 
167 // SEE HEADER FOR THREAD SAFETY NOTE
eager_reader_read(eager_reader_t * reader,uint8_t * buffer,size_t max_size,bool block)168 size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size, bool block) {
169   assert(reader != NULL);
170   assert(buffer != NULL);
171 
172   // If the caller wants nonblocking behavior, poll to see if we have
173   // any bytes available before reading.
174   if (!block && !has_byte(reader))
175     return 0;
176 
177   // Find out how many bytes we have available in our various buffers.
178   eventfd_t bytes_available;
179   if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) {
180     LOG_ERROR("%s unable to read semaphore for output data.", __func__);
181     return 0;
182   }
183 
184   if (max_size > bytes_available)
185     max_size = bytes_available;
186 
187   size_t bytes_consumed = 0;
188   while (bytes_consumed < max_size) {
189     if (!reader->current_buffer)
190       reader->current_buffer = fixed_queue_dequeue(reader->buffers);
191 
192     size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset;
193     if (bytes_to_copy > (max_size - bytes_consumed))
194       bytes_to_copy = max_size - bytes_consumed;
195 
196     memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy);
197     bytes_consumed += bytes_to_copy;
198     reader->current_buffer->offset += bytes_to_copy;
199 
200     if (reader->current_buffer->offset >= reader->current_buffer->length) {
201       reader->allocator->free(reader->current_buffer);
202       reader->current_buffer = NULL;
203     }
204   }
205 
206   bytes_available -= bytes_consumed;
207   if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) {
208     LOG_ERROR("%s unable to write back bytes available for output data.", __func__);
209   }
210 
211   return bytes_consumed;
212 }
213 
eager_reader_get_read_thread(const eager_reader_t * reader)214 thread_t* eager_reader_get_read_thread(const eager_reader_t *reader) {
215   assert(reader != NULL);
216   return reader->inbound_read_thread;
217 }
218 
has_byte(const eager_reader_t * reader)219 static bool has_byte(const eager_reader_t *reader) {
220   assert(reader != NULL);
221 
222   fd_set read_fds;
223   FD_ZERO(&read_fds);
224   FD_SET(reader->bytes_available_fd, &read_fds);
225 
226   // Immediate timeout
227   struct timeval timeout;
228   timeout.tv_sec = 0;
229   timeout.tv_usec = 0;
230 
231   TEMP_FAILURE_RETRY(select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout));
232   return FD_ISSET(reader->bytes_available_fd, &read_fds);
233 }
234 
inbound_data_waiting(void * context)235 static void inbound_data_waiting(void *context) {
236   eager_reader_t *reader = (eager_reader_t *)context;
237 
238   data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
239   if (!buffer) {
240     LOG_ERROR("%s couldn't aquire memory for inbound data buffer.", __func__);
241     return;
242   }
243 
244   buffer->length = 0;
245   buffer->offset = 0;
246 
247   int bytes_read = TEMP_FAILURE_RETRY(read(reader->inbound_fd, buffer->data, reader->buffer_size));
248   if (bytes_read > 0) {
249     // Save the data for later
250     buffer->length = bytes_read;
251     fixed_queue_enqueue(reader->buffers, buffer);
252 
253     // Tell consumers data is available by incrementing
254     // the semaphore by the number of bytes we just read
255     eventfd_write(reader->bytes_available_fd, bytes_read);
256   } else {
257     if (bytes_read == 0)
258       LOG_WARN("%s fd said bytes existed, but none were found.", __func__);
259     else
260       LOG_WARN("%s unable to read from file descriptor: %s", __func__, strerror(errno));
261 
262     reader->allocator->free(buffer);
263   }
264 }
265 
internal_outbound_read_ready(void * context)266 static void internal_outbound_read_ready(void *context) {
267   assert(context != NULL);
268 
269   eager_reader_t *reader = (eager_reader_t *)context;
270   reader->outbound_read_ready(reader, reader->outbound_context);
271 }
272