1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <sys/types.h>
26 #include <stdio.h>
27 #include <string.h>
28 #include <errno.h>
29
30 #include <pulse/xmalloc.h>
31 #include <pulse/timeval.h>
32
33 #include <pulsecore/poll.h>
34 #include <pulsecore/core-error.h>
35 #include <pulsecore/core-rtclock.h>
36 #include <pulsecore/macro.h>
37 #include <pulsecore/llist.h>
38 #include <pulsecore/flist.h>
39 #include <pulsecore/core-util.h>
40 #include <pulsecore/ratelimit.h>
41 #include <pulse/rtclock.h>
42
43 #include "rtpoll.h"
44
45 /* #define DEBUG_TIMING */
46
47 struct pa_rtpoll {
48 struct pollfd *pollfd, *pollfd2;
49 unsigned n_pollfd_alloc, n_pollfd_used;
50
51 struct timeval next_elapse;
52 bool timer_enabled:1;
53
54 bool scan_for_dead:1;
55 bool running:1;
56 bool rebuild_needed:1;
57 bool quit:1;
58 bool timer_elapsed:1;
59
60 #ifdef DEBUG_TIMING
61 pa_usec_t timestamp;
62 pa_usec_t slept, awake;
63 #endif
64
65 PA_LLIST_HEAD(pa_rtpoll_item, items);
66 };
67
68 struct pa_rtpoll_item {
69 pa_rtpoll *rtpoll;
70 bool dead;
71
72 pa_rtpoll_priority_t priority;
73
74 struct pollfd *pollfd;
75 unsigned n_pollfd;
76
77 int (*work_cb)(pa_rtpoll_item *i);
78 int (*before_cb)(pa_rtpoll_item *i);
79 void (*after_cb)(pa_rtpoll_item *i);
80 void *work_userdata;
81 void *before_userdata;
82 void *after_userdata;
83
84 PA_LLIST_FIELDS(pa_rtpoll_item);
85 };
86
87 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
88
pa_rtpoll_new(void)89 pa_rtpoll *pa_rtpoll_new(void) {
90 pa_rtpoll *p;
91
92 p = pa_xnew0(pa_rtpoll, 1);
93
94 p->n_pollfd_alloc = 32;
95 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
96 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
97
98 #ifdef DEBUG_TIMING
99 p->timestamp = pa_rtclock_now();
100 #endif
101
102 return p;
103 }
104
rtpoll_rebuild(pa_rtpoll * p)105 static void rtpoll_rebuild(pa_rtpoll *p) {
106
107 struct pollfd *e, *t;
108 pa_rtpoll_item *i;
109 int ra = 0;
110
111 pa_assert(p);
112
113 p->rebuild_needed = false;
114
115 if (p->n_pollfd_used > p->n_pollfd_alloc) {
116 /* Hmm, we have to allocate some more space */
117 p->n_pollfd_alloc = p->n_pollfd_used * 2;
118 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
119 ra = 1;
120 }
121
122 e = p->pollfd2;
123
124 for (i = p->items; i; i = i->next) {
125
126 if (i->n_pollfd > 0) {
127 size_t l = i->n_pollfd * sizeof(struct pollfd);
128
129 if (i->pollfd)
130 memcpy(e, i->pollfd, l);
131 else
132 memset(e, 0, l);
133
134 i->pollfd = e;
135 } else
136 i->pollfd = NULL;
137
138 e += i->n_pollfd;
139 }
140
141 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
142 t = p->pollfd;
143 p->pollfd = p->pollfd2;
144 p->pollfd2 = t;
145
146 if (ra)
147 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
148 }
149
rtpoll_item_destroy(pa_rtpoll_item * i)150 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
151 pa_rtpoll *p;
152
153 pa_assert(i);
154
155 p = i->rtpoll;
156
157 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
158
159 p->n_pollfd_used -= i->n_pollfd;
160
161 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
162 pa_xfree(i);
163
164 p->rebuild_needed = true;
165 }
166
pa_rtpoll_free(pa_rtpoll * p)167 void pa_rtpoll_free(pa_rtpoll *p) {
168 pa_assert(p);
169
170 while (p->items)
171 rtpoll_item_destroy(p->items);
172
173 pa_xfree(p->pollfd);
174 pa_xfree(p->pollfd2);
175
176 pa_xfree(p);
177 }
178
reset_revents(pa_rtpoll_item * i)179 static void reset_revents(pa_rtpoll_item *i) {
180 struct pollfd *f;
181 unsigned n;
182
183 pa_assert(i);
184
185 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
186 return;
187
188 for (; n > 0; n--)
189 f[n-1].revents = 0;
190 }
191
reset_all_revents(pa_rtpoll * p)192 static void reset_all_revents(pa_rtpoll *p) {
193 pa_rtpoll_item *i;
194
195 pa_assert(p);
196
197 for (i = p->items; i; i = i->next) {
198
199 if (i->dead)
200 continue;
201
202 reset_revents(i);
203 }
204 }
205
pa_rtpoll_run(pa_rtpoll * p)206 int pa_rtpoll_run(pa_rtpoll *p) {
207 pa_rtpoll_item *i;
208 int r = 0;
209 struct timeval timeout;
210
211 pa_assert(p);
212 pa_assert(!p->running);
213
214 #ifdef DEBUG_TIMING
215 pa_log("rtpoll_run");
216 #endif
217
218 p->running = true;
219 p->timer_elapsed = false;
220
221 /* First, let's do some work */
222 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
223 int k;
224
225 if (i->dead)
226 continue;
227
228 if (!i->work_cb)
229 continue;
230
231 if (p->quit) {
232 #ifdef DEBUG_TIMING
233 pa_log("rtpoll finish");
234 #endif
235 goto finish;
236 }
237
238 if ((k = i->work_cb(i)) != 0) {
239 if (k < 0)
240 r = k;
241 #ifdef DEBUG_TIMING
242 pa_log("rtpoll finish");
243 #endif
244 goto finish;
245 }
246 }
247
248 /* Now let's prepare for entering the sleep */
249 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
250 int k = 0;
251
252 if (i->dead)
253 continue;
254
255 if (!i->before_cb)
256 continue;
257
258 if (p->quit || (k = i->before_cb(i)) != 0) {
259
260 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
261
262 for (i = i->prev; i; i = i->prev) {
263
264 if (i->dead)
265 continue;
266
267 if (!i->after_cb)
268 continue;
269
270 i->after_cb(i);
271 }
272
273 if (k < 0)
274 r = k;
275 #ifdef DEBUG_TIMING
276 pa_log("rtpoll finish");
277 #endif
278 goto finish;
279 }
280 }
281
282 if (p->rebuild_needed)
283 rtpoll_rebuild(p);
284
285 pa_zero(timeout);
286
287 /* Calculate timeout */
288 if (!p->quit && p->timer_enabled) {
289 struct timeval now;
290 pa_rtclock_get(&now);
291
292 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
293 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
294 }
295
296 #ifdef DEBUG_TIMING
297 {
298 pa_usec_t now = pa_rtclock_now();
299 p->awake = now - p->timestamp;
300 p->timestamp = now;
301 if (!p->quit && p->timer_enabled)
302 pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)));
303 else if (p->quit)
304 pa_log("poll timeout is ZERO");
305 else
306 pa_log("poll timeout is FOREVER");
307 }
308 #endif
309
310 /* OK, now let's sleep */
311 #ifdef HAVE_PPOLL
312 {
313 struct timespec ts;
314 ts.tv_sec = timeout.tv_sec;
315 ts.tv_nsec = timeout.tv_usec * 1000;
316 r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? &ts : NULL, NULL);
317 }
318 #else
319 r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
320 #endif
321
322 p->timer_elapsed = r == 0;
323
324 #ifdef DEBUG_TIMING
325 {
326 pa_usec_t now = pa_rtclock_now();
327 p->slept = now - p->timestamp;
328 p->timestamp = now;
329
330 pa_log("Process time %llu ms; sleep time %llu ms",
331 (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
332 (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
333 }
334 #endif
335
336 if (r < 0) {
337 if (errno == EAGAIN || errno == EINTR)
338 r = 0;
339 else
340 pa_log_error("poll(): %s", pa_cstrerror(errno));
341
342 reset_all_revents(p);
343 }
344
345 /* Let's tell everyone that we left the sleep */
346 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
347
348 if (i->dead)
349 continue;
350
351 if (!i->after_cb)
352 continue;
353
354 i->after_cb(i);
355 }
356
357 finish:
358
359 p->running = false;
360
361 if (p->scan_for_dead) {
362 pa_rtpoll_item *n;
363
364 p->scan_for_dead = false;
365
366 for (i = p->items; i; i = n) {
367 n = i->next;
368
369 if (i->dead)
370 rtpoll_item_destroy(i);
371 }
372 }
373
374 return r < 0 ? r : !p->quit;
375 }
376
pa_rtpoll_set_timer_absolute(pa_rtpoll * p,pa_usec_t usec)377 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
378 pa_assert(p);
379
380 pa_timeval_store(&p->next_elapse, usec);
381 p->timer_enabled = true;
382 }
383
pa_rtpoll_set_timer_relative(pa_rtpoll * p,pa_usec_t usec)384 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
385 pa_assert(p);
386
387 /* Scheduling a timeout for more than an hour is very very suspicious */
388 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
389
390 pa_rtclock_get(&p->next_elapse);
391 pa_timeval_add(&p->next_elapse, usec);
392 p->timer_enabled = true;
393 }
394
pa_rtpoll_set_timer_disabled(pa_rtpoll * p)395 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
396 pa_assert(p);
397
398 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
399 p->timer_enabled = false;
400 }
401
pa_rtpoll_item_new(pa_rtpoll * p,pa_rtpoll_priority_t prio,unsigned n_fds)402 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
403 pa_rtpoll_item *i, *j, *l = NULL;
404
405 pa_assert(p);
406
407 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
408 i = pa_xnew(pa_rtpoll_item, 1);
409
410 i->rtpoll = p;
411 i->dead = false;
412 i->n_pollfd = n_fds;
413 i->pollfd = NULL;
414 i->priority = prio;
415
416 i->work_userdata = NULL;
417 i->before_userdata = NULL;
418 i->work_userdata = NULL;
419 i->before_cb = NULL;
420 i->after_cb = NULL;
421 i->work_cb = NULL;
422
423 for (j = p->items; j; j = j->next) {
424 if (prio <= j->priority)
425 break;
426
427 l = j;
428 }
429
430 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
431
432 if (n_fds > 0) {
433 p->rebuild_needed = 1;
434 p->n_pollfd_used += n_fds;
435 }
436
437 return i;
438 }
439
pa_rtpoll_item_free(pa_rtpoll_item * i)440 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
441 pa_assert(i);
442
443 if (i->rtpoll->running) {
444 i->dead = true;
445 i->rtpoll->scan_for_dead = true;
446 return;
447 }
448
449 rtpoll_item_destroy(i);
450 }
451
pa_rtpoll_item_get_pollfd(pa_rtpoll_item * i,unsigned * n_fds)452 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
453 pa_assert(i);
454
455 if (i->n_pollfd > 0)
456 if (i->rtpoll->rebuild_needed)
457 rtpoll_rebuild(i->rtpoll);
458
459 if (n_fds)
460 *n_fds = i->n_pollfd;
461
462 return i->pollfd;
463 }
464
pa_rtpoll_item_set_before_callback(pa_rtpoll_item * i,int (* before_cb)(pa_rtpoll_item * i),void * userdata)465 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata) {
466 pa_assert(i);
467 pa_assert(i->priority < PA_RTPOLL_NEVER);
468
469 i->before_cb = before_cb;
470 i->before_userdata = userdata;
471 }
472
pa_rtpoll_item_set_after_callback(pa_rtpoll_item * i,void (* after_cb)(pa_rtpoll_item * i),void * userdata)473 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata) {
474 pa_assert(i);
475 pa_assert(i->priority < PA_RTPOLL_NEVER);
476
477 i->after_cb = after_cb;
478 i->after_userdata = userdata;
479 }
480
pa_rtpoll_item_set_work_callback(pa_rtpoll_item * i,int (* work_cb)(pa_rtpoll_item * i),void * userdata)481 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata) {
482 pa_assert(i);
483 pa_assert(i->priority < PA_RTPOLL_NEVER);
484
485 i->work_cb = work_cb;
486 i->work_userdata = userdata;
487 }
488
pa_rtpoll_item_get_work_userdata(pa_rtpoll_item * i)489 void* pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i) {
490 pa_assert(i);
491
492 return i->work_userdata;
493 }
494
fdsem_before(pa_rtpoll_item * i)495 static int fdsem_before(pa_rtpoll_item *i) {
496
497 if (pa_fdsem_before_poll(i->before_userdata) < 0)
498 return 1; /* 1 means immediate restart of the loop */
499
500 return 0;
501 }
502
fdsem_after(pa_rtpoll_item * i)503 static void fdsem_after(pa_rtpoll_item *i) {
504 pa_assert(i);
505
506 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
507 pa_fdsem_after_poll(i->after_userdata);
508 }
509
pa_rtpoll_item_new_fdsem(pa_rtpoll * p,pa_rtpoll_priority_t prio,pa_fdsem * f)510 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
511 pa_rtpoll_item *i;
512 struct pollfd *pollfd;
513
514 pa_assert(p);
515 pa_assert(f);
516
517 i = pa_rtpoll_item_new(p, prio, 1);
518
519 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
520
521 pollfd->fd = pa_fdsem_get(f);
522 pollfd->events = POLLIN;
523
524 pa_rtpoll_item_set_before_callback(i, fdsem_before, f);
525 pa_rtpoll_item_set_after_callback(i, fdsem_after, f);
526
527 return i;
528 }
529
asyncmsgq_read_before(pa_rtpoll_item * i)530 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
531 pa_assert(i);
532
533 if (pa_asyncmsgq_read_before_poll(i->before_userdata) < 0)
534 return 1; /* 1 means immediate restart of the loop */
535
536 return 0;
537 }
538
asyncmsgq_read_after(pa_rtpoll_item * i)539 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
540 pa_assert(i);
541
542 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
543 pa_asyncmsgq_read_after_poll(i->after_userdata);
544 }
545
asyncmsgq_read_work(pa_rtpoll_item * i)546 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
547 pa_msgobject *object;
548 int code;
549 void *data;
550 pa_memchunk chunk;
551 int64_t offset;
552
553 pa_assert(i);
554
555 if (pa_asyncmsgq_get(i->work_userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
556 int ret;
557
558 if (!object && code == PA_MESSAGE_SHUTDOWN) {
559 pa_asyncmsgq_done(i->work_userdata, 0);
560 /* Requests the loop to exit. Will cause the next iteration of
561 * pa_rtpoll_run() to return 0 */
562 i->rtpoll->quit = true;
563 return 1;
564 }
565
566 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
567 pa_asyncmsgq_done(i->work_userdata, ret);
568 return 1;
569 }
570
571 return 0;
572 }
573
pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll * p,pa_rtpoll_priority_t prio,pa_asyncmsgq * q)574 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
575 pa_rtpoll_item *i;
576 struct pollfd *pollfd;
577
578 pa_assert(p);
579 pa_assert(q);
580
581 i = pa_rtpoll_item_new(p, prio, 1);
582
583 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
584 pollfd->fd = pa_asyncmsgq_read_fd(q);
585 pollfd->events = POLLIN;
586
587 pa_rtpoll_item_set_before_callback(i, asyncmsgq_read_before, q);
588 pa_rtpoll_item_set_after_callback(i, asyncmsgq_read_after, q);
589 pa_rtpoll_item_set_work_callback(i, asyncmsgq_read_work, q);
590
591 return i;
592 }
593
asyncmsgq_write_before(pa_rtpoll_item * i)594 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
595 pa_assert(i);
596
597 pa_asyncmsgq_write_before_poll(i->before_userdata);
598 return 0;
599 }
600
asyncmsgq_write_after(pa_rtpoll_item * i)601 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
602 pa_assert(i);
603
604 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
605 pa_asyncmsgq_write_after_poll(i->after_userdata);
606 }
607
pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll * p,pa_rtpoll_priority_t prio,pa_asyncmsgq * q)608 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
609 pa_rtpoll_item *i;
610 struct pollfd *pollfd;
611
612 pa_assert(p);
613 pa_assert(q);
614
615 i = pa_rtpoll_item_new(p, prio, 1);
616
617 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
618 pollfd->fd = pa_asyncmsgq_write_fd(q);
619 pollfd->events = POLLIN;
620
621 pa_rtpoll_item_set_before_callback(i, asyncmsgq_write_before, q);
622 pa_rtpoll_item_set_after_callback(i, asyncmsgq_write_after, q);
623
624 return i;
625 }
626
pa_rtpoll_timer_elapsed(pa_rtpoll * p)627 bool pa_rtpoll_timer_elapsed(pa_rtpoll *p) {
628 pa_assert(p);
629
630 return p->timer_elapsed;
631 }
632