• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2006 Lennart Poettering
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   Lesser General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public
17   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <unistd.h>
25 #include <errno.h>
26 
27 #include <pulse/xmalloc.h>
28 #include <pulse/rtclock.h>
29 #include <pulse/timeval.h>
30 
31 #include <pulsecore/macro.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/semaphore.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/mutex.h>
36 #include <pulsecore/flist.h>
37 
38 #include "asyncmsgq.h"
39 #include "log/audio_log.h"
40 #define PA_SNPRINTF_STR_LENGTH 256
41 
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
43 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
44 
45 struct asyncmsgq_item {
46     int code;
47     pa_msgobject *object;
48     void *userdata;
49     pa_free_cb_t free_cb;
50     int64_t offset;
51     pa_memchunk memchunk;
52     pa_semaphore *semaphore;
53     int ret;
54 };
55 
56 struct pa_asyncmsgq {
57     PA_REFCNT_DECLARE;
58     pa_asyncq *asyncq;
59     pa_mutex *mutex; /* only for the writer side */
60     uint32_t mark;
61 
62     struct asyncmsgq_item *current;
63 };
64 
pa_asyncmsgq_new(unsigned size)65 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
66     pa_asyncq *asyncq;
67     pa_asyncmsgq *a;
68     static uint32_t asyncmsgqNum = 0;
69 
70     asyncq = pa_asyncq_new(size);
71     if (!asyncq)
72         return NULL;
73 
74     a = pa_xnew(pa_asyncmsgq, 1);
75 
76     PA_REFCNT_INIT(a);
77     a->asyncq = asyncq;
78     pa_assert_se(a->mutex = pa_mutex_new(false, true));
79     a->current = NULL;
80     asyncmsgqNum += 1;
81     a->mark = asyncmsgqNum;
82 
83     return a;
84 }
85 
asyncmsgq_free(pa_asyncmsgq * a)86 static void asyncmsgq_free(pa_asyncmsgq *a) {
87     struct asyncmsgq_item *i;
88     pa_assert(a);
89 
90     while ((i = pa_asyncq_pop(a->asyncq, false))) {
91 
92         pa_assert(!i->semaphore);
93 
94         if (i->object)
95             pa_msgobject_unref(i->object);
96 
97         if (i->memchunk.memblock)
98             pa_memblock_unref(i->memchunk.memblock);
99 
100         if (i->free_cb)
101             i->free_cb(i->userdata);
102 
103         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
104             pa_xfree(i);
105     }
106 
107     pa_asyncq_free(a->asyncq, NULL);
108     pa_mutex_free(a->mutex);
109     pa_xfree(a);
110 }
111 
pa_asyncmsgq_ref(pa_asyncmsgq * q)112 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
113     pa_assert(PA_REFCNT_VALUE(q) > 0);
114 
115     PA_REFCNT_INC(q);
116     return q;
117 }
118 
pa_asyncmsgq_unref(pa_asyncmsgq * q)119 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
120     pa_assert(PA_REFCNT_VALUE(q) > 0);
121 
122     if (PA_REFCNT_DEC(q) <= 0)
123         asyncmsgq_free(q);
124 }
125 
pa_asyncmsgq_post(pa_asyncmsgq * a,pa_msgobject * object,int code,const void * userdata,int64_t offset,const pa_memchunk * chunk,pa_free_cb_t free_cb)126 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
127     struct asyncmsgq_item *i;
128     pa_assert(PA_REFCNT_VALUE(a) > 0);
129 
130     char t[PA_SNPRINTF_STR_LENGTH] = {0};
131     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_post[%d] <msgqNo.%u> msg_wait_for_read:%u", code, a->mark,
132         PaAsyncqGetNumToRead(a->asyncq));
133     CallStart(t);
134 
135     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
136         i = pa_xnew(struct asyncmsgq_item, 1);
137 
138     i->code = code;
139     i->object = object ? pa_msgobject_ref(object) : NULL;
140     i->userdata = (void*) userdata;
141     i->free_cb = free_cb;
142     i->offset = offset;
143     if (chunk) {
144         pa_assert(chunk->memblock);
145         i->memchunk = *chunk;
146         pa_memblock_ref(i->memchunk.memblock);
147     } else
148         pa_memchunk_reset(&i->memchunk);
149     i->semaphore = NULL;
150 
151     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
152     pa_mutex_lock(a->mutex);
153     pa_asyncq_post(a->asyncq, i);
154     pa_mutex_unlock(a->mutex);
155     CallEnd();
156 }
157 
pa_asyncmsgq_send(pa_asyncmsgq * a,pa_msgobject * object,int code,const void * userdata,int64_t offset,const pa_memchunk * chunk)158 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
159     struct asyncmsgq_item i;
160     pa_assert(PA_REFCNT_VALUE(a) > 0);
161 
162     pa_usec_t startTime = pa_rtclock_now();
163     char t[PA_SNPRINTF_STR_LENGTH] = {0};
164     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_send[%d] <msgqNo.%u> msg_wait_for_read:%u", code, a->mark,
165         PaAsyncqGetNumToRead(a->asyncq));
166     CallStart(t);
167     i.code = code;
168     i.object = object;
169     i.userdata = (void*) userdata;
170     i.free_cb = NULL;
171     i.ret = -1;
172     i.offset = offset;
173     if (chunk) {
174         pa_assert(chunk->memblock);
175         i.memchunk = *chunk;
176     } else
177         pa_memchunk_reset(&i.memchunk);
178 
179     if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
180         i.semaphore = pa_semaphore_new(0);
181 
182     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
183     pa_mutex_lock(a->mutex);
184     pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0);
185     pa_mutex_unlock(a->mutex);
186 
187     pa_semaphore_wait(i.semaphore);
188 
189     if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
190         pa_semaphore_free(i.semaphore);
191     CallEnd();
192     pa_usec_t executionTime = pa_rtclock_now() - startTime;
193     if (executionTime > OH_DAEMON_TIMEOUT_THRESHOLD_ON_US) { // too long block of daemon thread, dangerous
194         AUDIO_WARNING_LOG("Execution time of this msg is too long: qLen[%{public}u], MSG[%{public}d] " \
195             "(%{public}" PRIu64 "ms)",
196             PaAsyncqGetNumToRead(a->asyncq), code, executionTime / PA_USEC_PER_MSEC);
197     }
198     return i.ret;
199 }
200 
pa_asyncmsgq_get(pa_asyncmsgq * a,pa_msgobject ** object,int * code,void ** userdata,int64_t * offset,pa_memchunk * chunk,bool wait_op)201 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) {
202     pa_assert(PA_REFCNT_VALUE(a) > 0);
203     pa_assert(!a->current);
204 
205     if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
206 /*         pa_log("failure"); */
207         return -1;
208     }
209 
210 /*     pa_log("success"); */
211 
212     char t[PA_SNPRINTF_STR_LENGTH] = {0};
213     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_get[%d] <msgqNo.%u> msg_wait_for_read:%u", a->current->code, a->mark,
214         PaAsyncqGetNumToRead(a->asyncq));
215     CallStart(t);
216     if (code)
217         *code = a->current->code;
218     if (userdata)
219         *userdata = a->current->userdata;
220     if (offset)
221         *offset = a->current->offset;
222     if (object) {
223         if ((*object = a->current->object))
224             pa_msgobject_assert_ref(*object);
225     }
226     if (chunk)
227         *chunk = a->current->memchunk;
228 
229 /*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
230 /*                  (void*) a, */
231 /*                  (void*) a->current->object, */
232 /*                  a->current->object ? a->current->object->parent.type_name : NULL, */
233 /*                  a->current->code, */
234 /*                  (void*) a->current->userdata, */
235 /*                  (unsigned long) a->current->memchunk.length); */
236 
237     CallEnd();
238     return 0;
239 }
240 
pa_asyncmsgq_done(pa_asyncmsgq * a,int ret)241 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
242     pa_assert(PA_REFCNT_VALUE(a) > 0);
243     pa_assert(a);
244     pa_assert(a->current);
245 
246     if (a->current->semaphore) {
247         a->current->ret = ret;
248         pa_semaphore_post(a->current->semaphore);
249     } else {
250 
251         if (a->current->free_cb)
252             a->current->free_cb(a->current->userdata);
253 
254         if (a->current->object)
255             pa_msgobject_unref(a->current->object);
256 
257         if (a->current->memchunk.memblock)
258             pa_memblock_unref(a->current->memchunk.memblock);
259 
260         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
261             pa_xfree(a->current);
262     }
263 
264     a->current = NULL;
265 }
266 
pa_asyncmsgq_wait_for(pa_asyncmsgq * a,int code)267 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
268     int c;
269     pa_assert(PA_REFCNT_VALUE(a) > 0);
270 
271     pa_asyncmsgq_ref(a);
272 
273     do {
274         pa_msgobject *o;
275         void *data;
276         int64_t offset;
277         pa_memchunk chunk;
278         int ret;
279 
280         if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0)
281             return -1;
282 
283         ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
284         pa_asyncmsgq_done(a, ret);
285 
286     } while (c != code);
287 
288     pa_asyncmsgq_unref(a);
289 
290     return 0;
291 }
292 
pa_asyncmsgq_process_one(pa_asyncmsgq * a)293 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
294     pa_msgobject *object;
295     int code;
296     void *data;
297     pa_memchunk chunk;
298     int64_t offset;
299     int ret;
300 
301     pa_assert(PA_REFCNT_VALUE(a) > 0);
302 
303     if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
304         return 0;
305 
306     pa_asyncmsgq_ref(a);
307     ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
308     pa_asyncmsgq_done(a, ret);
309     pa_asyncmsgq_unref(a);
310 
311     return 1;
312 }
313 
pa_asyncmsgq_read_fd(pa_asyncmsgq * a)314 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
315     pa_assert(PA_REFCNT_VALUE(a) > 0);
316 
317     return pa_asyncq_read_fd(a->asyncq);
318 }
319 
pa_asyncmsgq_read_before_poll(pa_asyncmsgq * a)320 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
321     pa_assert(PA_REFCNT_VALUE(a) > 0);
322 
323     return pa_asyncq_read_before_poll(a->asyncq);
324 }
325 
pa_asyncmsgq_read_after_poll(pa_asyncmsgq * a)326 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
327     pa_assert(PA_REFCNT_VALUE(a) > 0);
328 
329     pa_asyncq_read_after_poll(a->asyncq);
330 }
331 
pa_asyncmsgq_write_fd(pa_asyncmsgq * a)332 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
333     pa_assert(PA_REFCNT_VALUE(a) > 0);
334 
335     return pa_asyncq_write_fd(a->asyncq);
336 }
337 
pa_asyncmsgq_write_before_poll(pa_asyncmsgq * a)338 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
339     pa_assert(PA_REFCNT_VALUE(a) > 0);
340 
341     pa_asyncq_write_before_poll(a->asyncq);
342 }
343 
pa_asyncmsgq_write_after_poll(pa_asyncmsgq * a)344 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
345     pa_assert(PA_REFCNT_VALUE(a) > 0);
346 
347     pa_asyncq_write_after_poll(a->asyncq);
348 }
349 
pa_asyncmsgq_dispatch(pa_msgobject * object,int code,void * userdata,int64_t offset,pa_memchunk * memchunk)350 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
351 
352     if (object)
353         return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
354 
355     return 0;
356 }
357 
pa_asyncmsgq_flush(pa_asyncmsgq * a,bool run)358 void pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) {
359     pa_assert(PA_REFCNT_VALUE(a) > 0);
360 
361     for (;;) {
362         pa_msgobject *object;
363         int code;
364         void *data;
365         int64_t offset;
366         pa_memchunk chunk;
367         int ret;
368 
369         if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
370             return;
371 
372         if (!run) {
373             pa_asyncmsgq_done(a, -1);
374             continue;
375         }
376 
377         pa_asyncmsgq_ref(a);
378         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
379         pa_asyncmsgq_done(a, ret);
380         pa_asyncmsgq_unref(a);
381     }
382 }
383 
pa_asyncmsgq_dispatching(pa_asyncmsgq * a)384 bool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
385     pa_assert(PA_REFCNT_VALUE(a) > 0);
386 
387     return !!a->current;
388 }
389