• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2014 David Henningsson, Canonical Ltd.
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   Lesser General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public
17   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include "srbchannel.h"
25 
26 #include <pulsecore/atomic.h>
27 #include <pulse/xmalloc.h>
28 
29 /* #define DEBUG_SRBCHANNEL */
30 
31 /* This ringbuffer might be useful in other contexts too, but
32  * right now it's only used inside the srbchannel, so let's keep it here
33  * for the time being. */
34 typedef struct pa_ringbuffer pa_ringbuffer;
35 
36 struct pa_ringbuffer {
37     pa_atomic_t *count; /* amount of data in the buffer */
38     int capacity;
39     uint8_t *memory;
40     int readindex, writeindex;
41 };
42 
pa_ringbuffer_peek(pa_ringbuffer * r,int * count)43 static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) {
44     int c = pa_atomic_load(r->count);
45 
46     if (r->readindex + c > r->capacity)
47         *count = r->capacity - r->readindex;
48     else
49         *count = c;
50 
51     return r->memory + r->readindex;
52 }
53 
54 /* Returns true only if the buffer was completely full before the drop. */
pa_ringbuffer_drop(pa_ringbuffer * r,int count)55 static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) {
56     bool b = pa_atomic_sub(r->count, count) >= r->capacity;
57 
58     r->readindex += count;
59     r->readindex %= r->capacity;
60 
61     return b;
62 }
63 
pa_ringbuffer_begin_write(pa_ringbuffer * r,int * count)64 static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) {
65     int c = pa_atomic_load(r->count);
66 
67     *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c);
68 
69     return r->memory + r->writeindex;
70 }
71 
pa_ringbuffer_end_write(pa_ringbuffer * r,int count)72 static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) {
73     pa_atomic_add(r->count, count);
74     r->writeindex += count;
75     r->writeindex %= r->capacity;
76 }
77 
78 struct pa_srbchannel {
79     pa_ringbuffer rb_read, rb_write;
80     pa_fdsem *sem_read, *sem_write;
81     pa_memblock *memblock;
82 
83     void *cb_userdata;
84     pa_srbchannel_cb_t callback;
85 
86     pa_io_event *read_event;
87     pa_defer_event *defer_event;
88     pa_mainloop_api *mainloop;
89 };
90 
91 /* We always listen to sem_read, and always signal on sem_write.
92  *
93  * This means we signal the same semaphore for two scenarios:
94  * 1) We have written something to our send buffer, and want the other
95  *    side to read it
96  * 2) We have read something from our receive buffer that was previously
97  *    completely full, and want the other side to continue writing
98 */
99 
pa_srbchannel_write(pa_srbchannel * sr,const void * data,size_t l)100 size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
101     size_t written = 0;
102 
103     while (l > 0) {
104         int towrite;
105         void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);
106 
107         if ((size_t) towrite > l)
108             towrite = l;
109 
110         if (towrite == 0) {
111 #ifdef DEBUG_SRBCHANNEL
112             pa_log("srbchannel output buffer full");
113 #endif
114             break;
115         }
116 
117         memcpy(ptr, data, towrite);
118         pa_ringbuffer_end_write(&sr->rb_write, towrite);
119         written += towrite;
120         data = (uint8_t*) data + towrite;
121         l -= towrite;
122     }
123 #ifdef DEBUG_SRBCHANNEL
124     pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written);
125 #endif
126 
127     pa_fdsem_post(sr->sem_write);
128     return written;
129 }
130 
pa_srbchannel_read(pa_srbchannel * sr,void * data,size_t l)131 size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
132     size_t isread = 0;
133 
134     while (l > 0) {
135         int toread;
136         void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread);
137 
138         if ((size_t) toread > l)
139             toread = l;
140 
141         if (toread == 0)
142             break;
143 
144         memcpy(data, ptr, toread);
145 
146         if (pa_ringbuffer_drop(&sr->rb_read, toread)) {
147 #ifdef DEBUG_SRBCHANNEL
148             pa_log("Read from full output buffer, signalling fdsem");
149 #endif
150             pa_fdsem_post(sr->sem_write);
151         }
152 
153         isread += toread;
154         data = (uint8_t*) data + toread;
155         l -= toread;
156     }
157 
158 #ifdef DEBUG_SRBCHANNEL
159     pa_log("Read %d bytes from srbchannel", (int) isread);
160 #endif
161 
162     return isread;
163 }
164 
165 /* This is the memory layout of the ringbuffer shm block. It is followed by
166    read and write ringbuffer memory. */
167 struct srbheader {
168     pa_atomic_t read_count;
169     pa_atomic_t write_count;
170 
171     pa_fdsem_data read_semdata;
172     pa_fdsem_data write_semdata;
173 
174     int capacity;
175     int readbuf_offset;
176     int writebuf_offset;
177 
178     /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */
179 };
180 
srbchannel_rwloop(pa_srbchannel * sr)181 static void srbchannel_rwloop(pa_srbchannel* sr) {
182     do {
183 #ifdef DEBUG_SRBCHANNEL
184         int q;
185         pa_ringbuffer_peek(&sr->rb_read, &q);
186         pa_log("In rw loop from srbchannel, before callback, count = %d", q);
187 #endif
188 
189         if (sr->callback) {
190             if (!sr->callback(sr, sr->cb_userdata)) {
191 #ifdef DEBUG_SRBCHANNEL
192                 pa_log("Aborting read loop from srbchannel");
193 #endif
194                 return;
195             }
196         }
197 
198 #ifdef DEBUG_SRBCHANNEL
199         pa_ringbuffer_peek(&sr->rb_read, &q);
200         pa_log("In rw loop from srbchannel, after callback, count = %d", q);
201 #endif
202 
203     } while (pa_fdsem_before_poll(sr->sem_read) < 0);
204 }
205 
semread_cb(pa_mainloop_api * m,pa_io_event * e,int fd,pa_io_event_flags_t events,void * userdata)206 static void semread_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) {
207     pa_srbchannel* sr = userdata;
208 
209     pa_fdsem_after_poll(sr->sem_read);
210     srbchannel_rwloop(sr);
211 }
212 
defer_cb(pa_mainloop_api * m,pa_defer_event * e,void * userdata)213 static void defer_cb(pa_mainloop_api *m, pa_defer_event *e, void *userdata) {
214     pa_srbchannel* sr = userdata;
215 
216 #ifdef DEBUG_SRBCHANNEL
217     pa_log("Calling rw loop from deferred event");
218 #endif
219 
220     m->defer_enable(e, 0);
221     srbchannel_rwloop(sr);
222 }
223 
pa_srbchannel_new(pa_mainloop_api * m,pa_mempool * p)224 pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) {
225     int capacity;
226     int readfd;
227     struct srbheader *srh;
228 
229     pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
230     sr->mainloop = m;
231     sr->memblock = pa_memblock_new_pool(p, -1);
232     if (!sr->memblock)
233         goto fail;
234 
235     srh = pa_memblock_acquire(sr->memblock);
236     pa_zero(*srh);
237 
238     sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh));
239     srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh;
240 
241     capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2;
242 
243     sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity);
244     srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh;
245 
246     capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset);
247 
248     pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes",
249         (int) pa_memblock_get_length(sr->memblock), capacity);
250 
251     srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity;
252 
253     sr->rb_read.count = &srh->read_count;
254     sr->rb_write.count = &srh->write_count;
255 
256     sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata);
257     if (!sr->sem_read)
258         goto fail;
259 
260     sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata);
261     if (!sr->sem_write)
262         goto fail;
263 
264     readfd = pa_fdsem_get(sr->sem_read);
265 
266 #ifdef DEBUG_SRBCHANNEL
267     pa_log("Enabling io event on fd %d", readfd);
268 #endif
269 
270     sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
271     m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
272 
273     return sr;
274 
275 fail:
276     pa_srbchannel_free(sr);
277 
278     return NULL;
279 }
280 
pa_srbchannel_swap(pa_srbchannel * sr)281 static void pa_srbchannel_swap(pa_srbchannel *sr) {
282     pa_srbchannel temp = *sr;
283 
284     sr->sem_read = temp.sem_write;
285     sr->sem_write = temp.sem_read;
286     sr->rb_read = temp.rb_write;
287     sr->rb_write = temp.rb_read;
288 }
289 
pa_srbchannel_new_from_template(pa_mainloop_api * m,pa_srbchannel_template * t)290 pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t)
291 {
292     int temp;
293     struct srbheader *srh;
294     pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
295 
296     sr->mainloop = m;
297     sr->memblock = t->memblock;
298     pa_memblock_ref(sr->memblock);
299     srh = pa_memblock_acquire(sr->memblock);
300 
301     sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
302     sr->rb_read.count = &srh->read_count;
303     sr->rb_write.count = &srh->write_count;
304 
305     sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
306     sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;
307 
308     sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd);
309     if (!sr->sem_read)
310         goto fail;
311 
312     sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd);
313     if (!sr->sem_write)
314         goto fail;
315 
316     pa_srbchannel_swap(sr);
317     temp = t->readfd; t->readfd = t->writefd; t->writefd = temp;
318 
319 #ifdef DEBUG_SRBCHANNEL
320     pa_log("Enabling io event on fd %d", t->readfd);
321 #endif
322 
323     sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
324     m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
325 
326     return sr;
327 
328 fail:
329     pa_srbchannel_free(sr);
330 
331     return NULL;
332 }
333 
pa_srbchannel_export(pa_srbchannel * sr,pa_srbchannel_template * t)334 void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t) {
335     t->memblock = sr->memblock;
336     t->readfd = pa_fdsem_get(sr->sem_read);
337     t->writefd = pa_fdsem_get(sr->sem_write);
338 }
339 
pa_srbchannel_set_callback(pa_srbchannel * sr,pa_srbchannel_cb_t callback,void * userdata)340 void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) {
341     if (sr->callback)
342         pa_fdsem_after_poll(sr->sem_read);
343 
344     sr->callback = callback;
345     sr->cb_userdata = userdata;
346 
347     if (sr->callback) {
348         /* If there are events to be read already in the ringbuffer, we will not get any IO event for that,
349            because that's how pa_fdsem works. Therefore check the ringbuffer in a defer event instead. */
350         if (!sr->defer_event)
351             sr->defer_event = sr->mainloop->defer_new(sr->mainloop, defer_cb, sr);
352         sr->mainloop->defer_enable(sr->defer_event, 1);
353     }
354 }
355 
pa_srbchannel_free(pa_srbchannel * sr)356 void pa_srbchannel_free(pa_srbchannel *sr)
357 {
358 #ifdef DEBUG_SRBCHANNEL
359     pa_log("Freeing srbchannel");
360 #endif
361     pa_assert(sr);
362 
363     if (sr->defer_event)
364         sr->mainloop->defer_free(sr->defer_event);
365     if (sr->read_event)
366         sr->mainloop->io_free(sr->read_event);
367 
368     if (sr->sem_read)
369         pa_fdsem_free(sr->sem_read);
370     if (sr->sem_write)
371         pa_fdsem_free(sr->sem_write);
372 
373     if (sr->memblock) {
374         pa_memblock_release(sr->memblock);
375         pa_memblock_unref(sr->memblock);
376     }
377 
378     pa_xfree(sr);
379 }
380