• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2006 Lennart Poettering
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   Lesser General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public
17   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <unistd.h>
25 #include <errno.h>
26 
27 #include <pulse/xmalloc.h>
28 #include <pulse/rtclock.h>
29 #include <pulse/timeval.h>
30 
31 #include <pulsecore/log.h>
32 #include <pulsecore/semaphore.h>
33 #include <pulsecore/macro.h>
34 #include <pulsecore/mutex.h>
35 #include <pulsecore/flist.h>
36 
37 #include "asyncmsgq.h"
38 #define PA_SNPRINTF_STR_LENGTH 256
39 
40 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
41 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
42 
43 struct asyncmsgq_item {
44     int code;
45     pa_msgobject *object;
46     void *userdata;
47     pa_free_cb_t free_cb;
48     int64_t offset;
49     pa_memchunk memchunk;
50     pa_semaphore *semaphore;
51     int ret;
52 };
53 
54 struct pa_asyncmsgq {
55     PA_REFCNT_DECLARE;
56     pa_asyncq *asyncq;
57     pa_mutex *mutex; /* only for the writer side */
58     uint32_t mark;
59 
60     struct asyncmsgq_item *current;
61 };
62 
pa_asyncmsgq_new(unsigned size)63 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
64     pa_asyncq *asyncq;
65     pa_asyncmsgq *a;
66     static uint32_t asyncmsgqNum = 0;
67 
68     asyncq = pa_asyncq_new(size);
69     if (!asyncq)
70         return NULL;
71 
72     a = pa_xnew(pa_asyncmsgq, 1);
73 
74     PA_REFCNT_INIT(a);
75     a->asyncq = asyncq;
76     pa_assert_se(a->mutex = pa_mutex_new(false, true));
77     a->current = NULL;
78     asyncmsgqNum += 1;
79     a->mark = asyncmsgqNum;
80 
81     return a;
82 }
83 
asyncmsgq_free(pa_asyncmsgq * a)84 static void asyncmsgq_free(pa_asyncmsgq *a) {
85     struct asyncmsgq_item *i;
86     pa_assert(a);
87 
88     while ((i = pa_asyncq_pop(a->asyncq, false))) {
89 
90         pa_assert(!i->semaphore);
91 
92         if (i->object)
93             pa_msgobject_unref(i->object);
94 
95         if (i->memchunk.memblock)
96             pa_memblock_unref(i->memchunk.memblock);
97 
98         if (i->free_cb)
99             i->free_cb(i->userdata);
100 
101         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
102             pa_xfree(i);
103     }
104 
105     pa_asyncq_free(a->asyncq, NULL);
106     pa_mutex_free(a->mutex);
107     pa_xfree(a);
108 }
109 
pa_asyncmsgq_ref(pa_asyncmsgq * q)110 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
111     pa_assert(PA_REFCNT_VALUE(q) > 0);
112 
113     PA_REFCNT_INC(q);
114     return q;
115 }
116 
pa_asyncmsgq_unref(pa_asyncmsgq * q)117 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
118     pa_assert(PA_REFCNT_VALUE(q) > 0);
119 
120     if (PA_REFCNT_DEC(q) <= 0)
121         asyncmsgq_free(q);
122 }
123 
pa_asyncmsgq_post(pa_asyncmsgq * a,pa_msgobject * object,int code,const void * userdata,int64_t offset,const pa_memchunk * chunk,pa_free_cb_t free_cb)124 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
125     struct asyncmsgq_item *i;
126     pa_assert(PA_REFCNT_VALUE(a) > 0);
127 
128     char t[PA_SNPRINTF_STR_LENGTH] = {0};
129     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_post[%d] <msgqNo.%u> msg_wait_for_read:%u", code, a->mark,
130         PaAsyncqGetNumToRead(a->asyncq));
131     CallStart(t);
132 
133     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
134         i = pa_xnew(struct asyncmsgq_item, 1);
135 
136     i->code = code;
137     i->object = object ? pa_msgobject_ref(object) : NULL;
138     i->userdata = (void*) userdata;
139     i->free_cb = free_cb;
140     i->offset = offset;
141     if (chunk) {
142         pa_assert(chunk->memblock);
143         i->memchunk = *chunk;
144         pa_memblock_ref(i->memchunk.memblock);
145     } else
146         pa_memchunk_reset(&i->memchunk);
147     i->semaphore = NULL;
148 
149     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
150     pa_mutex_lock(a->mutex);
151     pa_asyncq_post(a->asyncq, i);
152     pa_mutex_unlock(a->mutex);
153     CallEnd();
154 }
155 
pa_asyncmsgq_send(pa_asyncmsgq * a,pa_msgobject * object,int code,const void * userdata,int64_t offset,const pa_memchunk * chunk)156 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
157     struct asyncmsgq_item i;
158     pa_assert(PA_REFCNT_VALUE(a) > 0);
159 
160     pa_usec_t startTime = pa_rtclock_now();
161     char t[PA_SNPRINTF_STR_LENGTH] = {0};
162     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_send[%d] <msgqNo.%u> msg_wait_for_read:%u", code, a->mark,
163         PaAsyncqGetNumToRead(a->asyncq));
164     CallStart(t);
165     i.code = code;
166     i.object = object;
167     i.userdata = (void*) userdata;
168     i.free_cb = NULL;
169     i.ret = -1;
170     i.offset = offset;
171     if (chunk) {
172         pa_assert(chunk->memblock);
173         i.memchunk = *chunk;
174     } else
175         pa_memchunk_reset(&i.memchunk);
176 
177     if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
178         i.semaphore = pa_semaphore_new(0);
179 
180     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
181     pa_mutex_lock(a->mutex);
182     pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0);
183     pa_mutex_unlock(a->mutex);
184 
185     pa_semaphore_wait(i.semaphore);
186 
187     if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
188         pa_semaphore_free(i.semaphore);
189     CallEnd();
190     pa_usec_t executionTime = pa_rtclock_now() - startTime;
191     if (executionTime > OH_DAEMON_TIMEOUT_THRESHOLD_ON_US) { // too long block of daemon thread, dangerous
192         AUDIO_WARNING_LOG("Execution time of this msg is too long: qLen[%{public}u], MSG[%{public}d] " \
193             "(%{public}" PRIu64 "ms)",
194             PaAsyncqGetNumToRead(a->asyncq), code, executionTime / PA_USEC_PER_MSEC);
195     }
196     return i.ret;
197 }
198 
pa_asyncmsgq_get(pa_asyncmsgq * a,pa_msgobject ** object,int * code,void ** userdata,int64_t * offset,pa_memchunk * chunk,bool wait_op)199 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) {
200     pa_assert(PA_REFCNT_VALUE(a) > 0);
201     pa_assert(!a->current);
202 
203     if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
204 /*         pa_log("failure"); */
205         return -1;
206     }
207 
208 /*     pa_log("success"); */
209 
210     char t[PA_SNPRINTF_STR_LENGTH] = {0};
211     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_get[%d] <msgqNo.%u> msg_wait_for_read:%u", a->current->code, a->mark,
212         PaAsyncqGetNumToRead(a->asyncq));
213     CallStart(t);
214     if (code)
215         *code = a->current->code;
216     if (userdata)
217         *userdata = a->current->userdata;
218     if (offset)
219         *offset = a->current->offset;
220     if (object) {
221         if ((*object = a->current->object))
222             pa_msgobject_assert_ref(*object);
223     }
224     if (chunk)
225         *chunk = a->current->memchunk;
226 
227 /*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
228 /*                  (void*) a, */
229 /*                  (void*) a->current->object, */
230 /*                  a->current->object ? a->current->object->parent.type_name : NULL, */
231 /*                  a->current->code, */
232 /*                  (void*) a->current->userdata, */
233 /*                  (unsigned long) a->current->memchunk.length); */
234 
235     CallEnd();
236     return 0;
237 }
238 
pa_asyncmsgq_done(pa_asyncmsgq * a,int ret)239 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
240     pa_assert(PA_REFCNT_VALUE(a) > 0);
241     pa_assert(a);
242     pa_assert(a->current);
243 
244     if (a->current->semaphore) {
245         a->current->ret = ret;
246         pa_semaphore_post(a->current->semaphore);
247     } else {
248 
249         if (a->current->free_cb)
250             a->current->free_cb(a->current->userdata);
251 
252         if (a->current->object)
253             pa_msgobject_unref(a->current->object);
254 
255         if (a->current->memchunk.memblock)
256             pa_memblock_unref(a->current->memchunk.memblock);
257 
258         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
259             pa_xfree(a->current);
260     }
261 
262     a->current = NULL;
263 }
264 
pa_asyncmsgq_wait_for(pa_asyncmsgq * a,int code)265 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
266     int c;
267     pa_assert(PA_REFCNT_VALUE(a) > 0);
268 
269     pa_asyncmsgq_ref(a);
270 
271     do {
272         pa_msgobject *o;
273         void *data;
274         int64_t offset;
275         pa_memchunk chunk;
276         int ret;
277 
278         if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0)
279             return -1;
280 
281         ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
282         pa_asyncmsgq_done(a, ret);
283 
284     } while (c != code);
285 
286     pa_asyncmsgq_unref(a);
287 
288     return 0;
289 }
290 
pa_asyncmsgq_process_one(pa_asyncmsgq * a)291 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
292     pa_msgobject *object;
293     int code;
294     void *data;
295     pa_memchunk chunk;
296     int64_t offset;
297     int ret;
298 
299     pa_assert(PA_REFCNT_VALUE(a) > 0);
300 
301     if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
302         return 0;
303 
304     pa_asyncmsgq_ref(a);
305     ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
306     pa_asyncmsgq_done(a, ret);
307     pa_asyncmsgq_unref(a);
308 
309     return 1;
310 }
311 
pa_asyncmsgq_read_fd(pa_asyncmsgq * a)312 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
313     pa_assert(PA_REFCNT_VALUE(a) > 0);
314 
315     return pa_asyncq_read_fd(a->asyncq);
316 }
317 
pa_asyncmsgq_read_before_poll(pa_asyncmsgq * a)318 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
319     pa_assert(PA_REFCNT_VALUE(a) > 0);
320 
321     return pa_asyncq_read_before_poll(a->asyncq);
322 }
323 
pa_asyncmsgq_read_after_poll(pa_asyncmsgq * a)324 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
325     pa_assert(PA_REFCNT_VALUE(a) > 0);
326 
327     pa_asyncq_read_after_poll(a->asyncq);
328 }
329 
pa_asyncmsgq_write_fd(pa_asyncmsgq * a)330 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
331     pa_assert(PA_REFCNT_VALUE(a) > 0);
332 
333     return pa_asyncq_write_fd(a->asyncq);
334 }
335 
pa_asyncmsgq_write_before_poll(pa_asyncmsgq * a)336 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
337     pa_assert(PA_REFCNT_VALUE(a) > 0);
338 
339     pa_asyncq_write_before_poll(a->asyncq);
340 }
341 
pa_asyncmsgq_write_after_poll(pa_asyncmsgq * a)342 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
343     pa_assert(PA_REFCNT_VALUE(a) > 0);
344 
345     pa_asyncq_write_after_poll(a->asyncq);
346 }
347 
pa_asyncmsgq_dispatch(pa_msgobject * object,int code,void * userdata,int64_t offset,pa_memchunk * memchunk)348 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
349 
350     if (object)
351         return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
352 
353     return 0;
354 }
355 
pa_asyncmsgq_flush(pa_asyncmsgq * a,bool run)356 void pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) {
357     pa_assert(PA_REFCNT_VALUE(a) > 0);
358 
359     for (;;) {
360         pa_msgobject *object;
361         int code;
362         void *data;
363         int64_t offset;
364         pa_memchunk chunk;
365         int ret;
366 
367         if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
368             return;
369 
370         if (!run) {
371             pa_asyncmsgq_done(a, -1);
372             continue;
373         }
374 
375         pa_asyncmsgq_ref(a);
376         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
377         pa_asyncmsgq_done(a, ret);
378         pa_asyncmsgq_unref(a);
379     }
380 }
381 
pa_asyncmsgq_dispatching(pa_asyncmsgq * a)382 bool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
383     pa_assert(PA_REFCNT_VALUE(a) > 0);
384 
385     return !!a->current;
386 }
387