• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2006 Lennart Poettering
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   Lesser General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public
17   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <unistd.h>
25 #include <errno.h>
26 
27 #include <pulse/xmalloc.h>
28 
29 #include <pulsecore/macro.h>
30 #include <pulsecore/log.h>
31 #include <pulsecore/semaphore.h>
32 #include <pulsecore/macro.h>
33 #include <pulsecore/mutex.h>
34 #include <pulsecore/flist.h>
35 
36 #include "asyncmsgq.h"
37 
38 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
39 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
40 
41 struct asyncmsgq_item {
42     int code;
43     pa_msgobject *object;
44     void *userdata;
45     pa_free_cb_t free_cb;
46     int64_t offset;
47     pa_memchunk memchunk;
48     pa_semaphore *semaphore;
49     int ret;
50 };
51 
52 struct pa_asyncmsgq {
53     PA_REFCNT_DECLARE;
54     pa_asyncq *asyncq;
55     pa_mutex *mutex; /* only for the writer side */
56 
57     struct asyncmsgq_item *current;
58 };
59 
pa_asyncmsgq_new(unsigned size)60 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
61     pa_asyncq *asyncq;
62     pa_asyncmsgq *a;
63 
64     asyncq = pa_asyncq_new(size);
65     if (!asyncq)
66         return NULL;
67 
68     a = pa_xnew(pa_asyncmsgq, 1);
69 
70     PA_REFCNT_INIT(a);
71     a->asyncq = asyncq;
72     pa_assert_se(a->mutex = pa_mutex_new(false, true));
73     a->current = NULL;
74 
75     return a;
76 }
77 
asyncmsgq_free(pa_asyncmsgq * a)78 static void asyncmsgq_free(pa_asyncmsgq *a) {
79     struct asyncmsgq_item *i;
80     pa_assert(a);
81 
82     while ((i = pa_asyncq_pop(a->asyncq, false))) {
83 
84         pa_assert(!i->semaphore);
85 
86         if (i->object)
87             pa_msgobject_unref(i->object);
88 
89         if (i->memchunk.memblock)
90             pa_memblock_unref(i->memchunk.memblock);
91 
92         if (i->free_cb)
93             i->free_cb(i->userdata);
94 
95         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
96             pa_xfree(i);
97     }
98 
99     pa_asyncq_free(a->asyncq, NULL);
100     pa_mutex_free(a->mutex);
101     pa_xfree(a);
102 }
103 
pa_asyncmsgq_ref(pa_asyncmsgq * q)104 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
105     pa_assert(PA_REFCNT_VALUE(q) > 0);
106 
107     PA_REFCNT_INC(q);
108     return q;
109 }
110 
pa_asyncmsgq_unref(pa_asyncmsgq * q)111 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
112     pa_assert(PA_REFCNT_VALUE(q) > 0);
113 
114     if (PA_REFCNT_DEC(q) <= 0)
115         asyncmsgq_free(q);
116 }
117 
pa_asyncmsgq_post(pa_asyncmsgq * a,pa_msgobject * object,int code,const void * userdata,int64_t offset,const pa_memchunk * chunk,pa_free_cb_t free_cb)118 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
119     struct asyncmsgq_item *i;
120     pa_assert(PA_REFCNT_VALUE(a) > 0);
121 
122     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
123         i = pa_xnew(struct asyncmsgq_item, 1);
124 
125     i->code = code;
126     i->object = object ? pa_msgobject_ref(object) : NULL;
127     i->userdata = (void*) userdata;
128     i->free_cb = free_cb;
129     i->offset = offset;
130     if (chunk) {
131         pa_assert(chunk->memblock);
132         i->memchunk = *chunk;
133         pa_memblock_ref(i->memchunk.memblock);
134     } else
135         pa_memchunk_reset(&i->memchunk);
136     i->semaphore = NULL;
137 
138     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
139     pa_mutex_lock(a->mutex);
140     pa_asyncq_post(a->asyncq, i);
141     pa_mutex_unlock(a->mutex);
142 }
143 
pa_asyncmsgq_send(pa_asyncmsgq * a,pa_msgobject * object,int code,const void * userdata,int64_t offset,const pa_memchunk * chunk)144 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
145     struct asyncmsgq_item i;
146     pa_assert(PA_REFCNT_VALUE(a) > 0);
147 
148     i.code = code;
149     i.object = object;
150     i.userdata = (void*) userdata;
151     i.free_cb = NULL;
152     i.ret = -1;
153     i.offset = offset;
154     if (chunk) {
155         pa_assert(chunk->memblock);
156         i.memchunk = *chunk;
157     } else
158         pa_memchunk_reset(&i.memchunk);
159 
160     if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
161         i.semaphore = pa_semaphore_new(0);
162 
163     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
164     pa_mutex_lock(a->mutex);
165     pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0);
166     pa_mutex_unlock(a->mutex);
167 
168     pa_semaphore_wait(i.semaphore);
169 
170     if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
171         pa_semaphore_free(i.semaphore);
172 
173     return i.ret;
174 }
175 
pa_asyncmsgq_get(pa_asyncmsgq * a,pa_msgobject ** object,int * code,void ** userdata,int64_t * offset,pa_memchunk * chunk,bool wait_op)176 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) {
177     pa_assert(PA_REFCNT_VALUE(a) > 0);
178     pa_assert(!a->current);
179 
180     if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
181 /*         pa_log("failure"); */
182         return -1;
183     }
184 
185 /*     pa_log("success"); */
186 
187     if (code)
188         *code = a->current->code;
189     if (userdata)
190         *userdata = a->current->userdata;
191     if (offset)
192         *offset = a->current->offset;
193     if (object) {
194         if ((*object = a->current->object))
195             pa_msgobject_assert_ref(*object);
196     }
197     if (chunk)
198         *chunk = a->current->memchunk;
199 
200 /*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
201 /*                  (void*) a, */
202 /*                  (void*) a->current->object, */
203 /*                  a->current->object ? a->current->object->parent.type_name : NULL, */
204 /*                  a->current->code, */
205 /*                  (void*) a->current->userdata, */
206 /*                  (unsigned long) a->current->memchunk.length); */
207 
208     return 0;
209 }
210 
pa_asyncmsgq_done(pa_asyncmsgq * a,int ret)211 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
212     pa_assert(PA_REFCNT_VALUE(a) > 0);
213     pa_assert(a);
214     pa_assert(a->current);
215 
216     if (a->current->semaphore) {
217         a->current->ret = ret;
218         pa_semaphore_post(a->current->semaphore);
219     } else {
220 
221         if (a->current->free_cb)
222             a->current->free_cb(a->current->userdata);
223 
224         if (a->current->object)
225             pa_msgobject_unref(a->current->object);
226 
227         if (a->current->memchunk.memblock)
228             pa_memblock_unref(a->current->memchunk.memblock);
229 
230         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
231             pa_xfree(a->current);
232     }
233 
234     a->current = NULL;
235 }
236 
pa_asyncmsgq_wait_for(pa_asyncmsgq * a,int code)237 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
238     int c;
239     pa_assert(PA_REFCNT_VALUE(a) > 0);
240 
241     pa_asyncmsgq_ref(a);
242 
243     do {
244         pa_msgobject *o;
245         void *data;
246         int64_t offset;
247         pa_memchunk chunk;
248         int ret;
249 
250         if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0)
251             return -1;
252 
253         ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
254         pa_asyncmsgq_done(a, ret);
255 
256     } while (c != code);
257 
258     pa_asyncmsgq_unref(a);
259 
260     return 0;
261 }
262 
pa_asyncmsgq_process_one(pa_asyncmsgq * a)263 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
264     pa_msgobject *object;
265     int code;
266     void *data;
267     pa_memchunk chunk;
268     int64_t offset;
269     int ret;
270 
271     pa_assert(PA_REFCNT_VALUE(a) > 0);
272 
273     if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
274         return 0;
275 
276     pa_asyncmsgq_ref(a);
277     ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
278     pa_asyncmsgq_done(a, ret);
279     pa_asyncmsgq_unref(a);
280 
281     return 1;
282 }
283 
pa_asyncmsgq_read_fd(pa_asyncmsgq * a)284 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
285     pa_assert(PA_REFCNT_VALUE(a) > 0);
286 
287     return pa_asyncq_read_fd(a->asyncq);
288 }
289 
pa_asyncmsgq_read_before_poll(pa_asyncmsgq * a)290 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
291     pa_assert(PA_REFCNT_VALUE(a) > 0);
292 
293     return pa_asyncq_read_before_poll(a->asyncq);
294 }
295 
pa_asyncmsgq_read_after_poll(pa_asyncmsgq * a)296 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
297     pa_assert(PA_REFCNT_VALUE(a) > 0);
298 
299     pa_asyncq_read_after_poll(a->asyncq);
300 }
301 
pa_asyncmsgq_write_fd(pa_asyncmsgq * a)302 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
303     pa_assert(PA_REFCNT_VALUE(a) > 0);
304 
305     return pa_asyncq_write_fd(a->asyncq);
306 }
307 
pa_asyncmsgq_write_before_poll(pa_asyncmsgq * a)308 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
309     pa_assert(PA_REFCNT_VALUE(a) > 0);
310 
311     pa_asyncq_write_before_poll(a->asyncq);
312 }
313 
pa_asyncmsgq_write_after_poll(pa_asyncmsgq * a)314 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
315     pa_assert(PA_REFCNT_VALUE(a) > 0);
316 
317     pa_asyncq_write_after_poll(a->asyncq);
318 }
319 
pa_asyncmsgq_dispatch(pa_msgobject * object,int code,void * userdata,int64_t offset,pa_memchunk * memchunk)320 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
321 
322     if (object)
323         return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
324 
325     return 0;
326 }
327 
pa_asyncmsgq_flush(pa_asyncmsgq * a,bool run)328 void pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) {
329     pa_assert(PA_REFCNT_VALUE(a) > 0);
330 
331     for (;;) {
332         pa_msgobject *object;
333         int code;
334         void *data;
335         int64_t offset;
336         pa_memchunk chunk;
337         int ret;
338 
339         if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
340             return;
341 
342         if (!run) {
343             pa_asyncmsgq_done(a, -1);
344             continue;
345         }
346 
347         pa_asyncmsgq_ref(a);
348         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
349         pa_asyncmsgq_done(a, ret);
350         pa_asyncmsgq_unref(a);
351     }
352 }
353 
pa_asyncmsgq_dispatching(pa_asyncmsgq * a)354 bool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
355     pa_assert(PA_REFCNT_VALUE(a) > 0);
356 
357     return !!a->current;
358 }
359