• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2008 Lennart Poettering
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2.1 of the License,
9   or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public License
17   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <stdio.h>
25 #include <errno.h>
26 #include <math.h>
27 
28 #include <pulse/rtclock.h>
29 #include <pulse/timeval.h>
30 #include <pulse/util.h>
31 #include <pulse/xmalloc.h>
32 
33 #include <pulsecore/macro.h>
34 #include <pulsecore/module.h>
35 #include <pulsecore/llist.h>
36 #include <pulsecore/sink.h>
37 #include <pulsecore/sink-input.h>
38 #include <pulsecore/memblockq.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/namereg.h>
44 #include <pulsecore/thread.h>
45 #include <pulsecore/thread-mq.h>
46 #include <pulsecore/rtpoll.h>
47 
48 #ifdef USE_SMOOTHER_2
49 #include <pulsecore/time-smoother_2.h>
50 #else
51 #include <pulsecore/time-smoother.h>
52 #endif
53 
54 #include <pulsecore/strlist.h>
55 
56 PA_MODULE_AUTHOR("Lennart Poettering");
57 PA_MODULE_DESCRIPTION("Combine multiple sinks to one");
58 PA_MODULE_VERSION(PACKAGE_VERSION);
59 PA_MODULE_LOAD_ONCE(false);
60 PA_MODULE_USAGE(
61         "sink_name=<name for the sink> "
62         "sink_properties=<properties for the sink> "
63         "slaves=<slave sinks> "
64         "adjust_time=<how often to readjust rates in s> "
65         "resample_method=<method> "
66         "format=<sample format> "
67         "rate=<sample rate> "
68         "channels=<number of channels> "
69         "channel_map=<channel map>"
70         "remix=<boolean>");
71 
72 #define DEFAULT_SINK_NAME "combined"
73 
74 #define MEMBLOCKQ_MAXLENGTH (1024*1024*16)
75 
76 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
77 
78 #define BLOCK_USEC (PA_USEC_PER_MSEC * 200)
79 
80 static const char* const valid_modargs[] = {
81     "sink_name",
82     "sink_properties",
83     "slaves",
84     "adjust_time",
85     "resample_method",
86     "format",
87     "rate",
88     "channels",
89     "channel_map",
90     "remix",
91     NULL
92 };
93 
94 struct output {
95     struct userdata *userdata;
96 
97     pa_sink *sink;
98     pa_sink_input *sink_input;
99     bool ignore_state_change;
100 
101     /* This message queue is only for POST messages, i.e. the messages that
102      * carry audio data from the sink thread to the output thread. The POST
103      * messages need to be handled in a separate queue, because the queue is
104      * processed not only in the output thread mainloop, but also inside the
105      * sink input pop() callback. Processing other messages (such as
106      * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least
107      * one reason why it's not safe is that messages that generate rewind
108      * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed
109      * in the pop() callback. */
110     pa_asyncmsgq *audio_inq;
111 
112     /* This message queue is for all other messages than POST from the sink
113      * thread to the output thread (currently "all other messages" means just
114      * the SET_REQUESTED_LATENCY message). */
115     pa_asyncmsgq *control_inq;
116 
117     /* Message queue from the output thread to the sink thread. */
118     pa_asyncmsgq *outq;
119 
120     pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write;
121     pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write;
122     pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write;
123 
124     pa_memblockq *memblockq;
125 
126     /* For communication of the stream latencies to the main thread */
127     pa_usec_t total_latency;
128     struct {
129         pa_usec_t timestamp;
130         pa_usec_t sink_latency;
131         size_t output_memblockq_size;
132         uint64_t receive_counter;
133     } latency_snapshot;
134 
135     uint64_t receive_counter;
136 
137     /* For communication of the stream parameters to the sink thread */
138     pa_atomic_t max_request;
139     pa_atomic_t max_latency;
140     pa_atomic_t min_latency;
141 
142     PA_LLIST_FIELDS(struct output);
143 };
144 
145 struct userdata {
146     pa_core *core;
147     pa_module *module;
148     pa_sink *sink;
149 
150     pa_thread *thread;
151     pa_thread_mq thread_mq;
152     pa_rtpoll *rtpoll;
153 
154     pa_time_event *time_event;
155     pa_usec_t adjust_time;
156 
157     bool automatic;
158     bool auto_desc;
159 
160     pa_strlist *unlinked_slaves;
161 
162     pa_hook_slot *sink_put_slot, *sink_unlink_slot, *sink_state_changed_slot;
163 
164     pa_resample_method_t resample_method;
165 
166     pa_usec_t block_usec;
167     pa_usec_t default_min_latency;
168     pa_usec_t default_max_latency;
169 
170     pa_idxset* outputs; /* managed in main context */
171 
172     bool remix;
173 
174     struct {
175         PA_LLIST_HEAD(struct output, active_outputs); /* managed in IO thread context */
176         pa_atomic_t running;  /* we cache that value here, so that every thread can query it cheaply */
177         pa_usec_t timestamp;
178         bool in_null_mode;
179 #ifdef USE_SMOOTHER_2
180         pa_smoother_2 *smoother;
181 #else
182         pa_smoother *smoother;
183 #endif
184         uint64_t counter;
185 
186         uint64_t snapshot_counter;
187         pa_usec_t snapshot_time;
188 
189         pa_usec_t render_timestamp;
190     } thread_info;
191 };
192 
193 struct sink_snapshot {
194     pa_usec_t timestamp;
195     uint64_t send_counter;
196 };
197 
198 enum {
199     SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX,
200     SINK_MESSAGE_REMOVE_OUTPUT,
201     SINK_MESSAGE_NEED,
202     SINK_MESSAGE_UPDATE_LATENCY,
203     SINK_MESSAGE_UPDATE_MAX_REQUEST,
204     SINK_MESSAGE_UPDATE_LATENCY_RANGE,
205     SINK_MESSAGE_GET_SNAPSHOT
206 };
207 
208 enum {
209     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
210     SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY,
211     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
212 };
213 
214 static void output_disable(struct output *o);
215 static void output_enable(struct output *o);
216 static void output_free(struct output *o);
217 static int output_create_sink_input(struct output *o);
218 
219 /* rate controller, called from main context
220  * - maximum deviation from base rate is less than 1%
221  * - controller step size is limited to 2.01‰
222  * - exhibits hunting with USB or Bluetooth devices
223  */
rate_controller(struct output * o,uint32_t base_rate,uint32_t old_rate,int32_t latency_difference_usec)224 static uint32_t rate_controller(
225                 struct output *o,
226                 uint32_t base_rate, uint32_t old_rate,
227                 int32_t latency_difference_usec) {
228 
229     double new_rate, new_rate_1, new_rate_2;
230     double min_cycles_1, min_cycles_2;
231 
232     /* Calculate next rate that is not more than 2‰ away from the last rate */
233     min_cycles_1 = (double)abs(latency_difference_usec) / o->userdata->adjust_time / 0.002 + 1;
234     new_rate_1 = old_rate + base_rate * (double)latency_difference_usec / min_cycles_1 / o->userdata->adjust_time;
235 
236     /* Calculate best rate to correct the current latency offset, limit at
237      * 1% difference from base_rate */
238     min_cycles_2 = (double)abs(latency_difference_usec) / o->userdata->adjust_time / 0.01 + 1;
239     new_rate_2 = (double)base_rate * (1.0 + (double)latency_difference_usec / min_cycles_2 / o->userdata->adjust_time);
240 
241     /* Choose the rate that is nearer to base_rate */
242     new_rate = new_rate_2;
243     if (fabs(new_rate_1 - base_rate) < fabs(new_rate_2 - base_rate))
244         new_rate = new_rate_1;
245 
246     return (uint32_t)(new_rate + 0.5);
247 }
248 
adjust_rates(struct userdata * u)249 static void adjust_rates(struct userdata *u) {
250     struct output *o;
251     struct sink_snapshot rdata;
252     pa_usec_t avg_total_latency = 0;
253     pa_usec_t target_latency = 0;
254     pa_usec_t max_sink_latency = 0;
255     pa_usec_t min_total_latency = (pa_usec_t)-1;
256     uint32_t base_rate;
257     uint32_t idx;
258     unsigned n = 0;
259     pa_usec_t now;
260     struct output *o_max;
261 
262     pa_assert(u);
263     pa_sink_assert_ref(u->sink);
264 
265     if (pa_idxset_size(u->outputs) <= 0)
266         return;
267 
268     if (u->sink->state != PA_SINK_RUNNING)
269         return;
270 
271     /* Get sink snapshot */
272     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_SNAPSHOT, &rdata, 0, NULL);
273 
274     /* The sink snapshot time is the time when the last data was rendered.
275      * Latency is calculated for that point in time. */
276     now = rdata.timestamp;
277 
278     /* Sink snapshot is not yet valid. */
279     if (!now)
280         return;
281 
282     PA_IDXSET_FOREACH(o, u->outputs, idx) {
283         pa_usec_t snapshot_latency;
284         int64_t time_difference;
285 
286         if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
287             continue;
288 
289         /* The difference may become negative, because it is probable, that the last
290          * render time was before the sink input snapshot. In this case, the sink
291          * had some more latency at the render time, so subtracting the value still
292          * gives the right result. */
293         time_difference = (int64_t)now - (int64_t)o->latency_snapshot.timestamp;
294 
295         /* Latency at sink snapshot time is sink input snapshot latency minus time
296          * passed between the two snapshots. */
297         snapshot_latency = o->latency_snapshot.sink_latency
298                            + pa_bytes_to_usec(o->latency_snapshot.output_memblockq_size, &o->sink_input->sample_spec)
299                            - time_difference;
300 
301         /* Add the data that was sent between taking the sink input snapshot
302          * and the sink snapshot. */
303         snapshot_latency += pa_bytes_to_usec(rdata.send_counter - o->latency_snapshot.receive_counter, &o->sink_input->sample_spec);
304 
305         /* This is the current combined latency of the slave sink and the related
306          * memblockq at the time of the sink snapshot. */
307         o->total_latency = snapshot_latency;
308         avg_total_latency += snapshot_latency;
309 
310         /* Get max_sink_latency and min_total_latency for target selection. */
311         if (min_total_latency == (pa_usec_t)-1 || o->total_latency < min_total_latency)
312             min_total_latency = o->total_latency;
313 
314         if (o->latency_snapshot.sink_latency > max_sink_latency) {
315             max_sink_latency = o->latency_snapshot.sink_latency;
316             o_max = o;
317         }
318 
319         /* Debug output */
320         pa_log_debug("[%s] Snapshot sink latency = %0.2fms, total snapshot latency = %0.2fms", o->sink->name, (double) o->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, (double) snapshot_latency / PA_USEC_PER_MSEC);
321 
322         if (o->total_latency > 10*PA_USEC_PER_SEC)
323             pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC);
324 
325         n++;
326     }
327 
328     /* If there is no valid output there is nothing to do. */
329     if (min_total_latency == (pa_usec_t) -1)
330         return;
331 
332     avg_total_latency /= n;
333 
334     /* The target selection ensures, that at least one of the
335      * sinks will use the base rate and all other sinks are set
336      * relative to it. */
337     if (max_sink_latency > min_total_latency)
338         target_latency = o_max->total_latency;
339     else
340         target_latency = min_total_latency;
341 
342     pa_log_info("[%s] avg total latency is %0.2f msec.", u->sink->name, (double) avg_total_latency / PA_USEC_PER_MSEC);
343     pa_log_info("[%s] target latency for all slaves is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC);
344 
345     base_rate = u->sink->sample_spec.rate;
346 
347     /* Calculate and set rates for the sink inputs. */
348     PA_IDXSET_FOREACH(o, u->outputs, idx) {
349         uint32_t new_rate;
350         int32_t latency_difference;
351 
352         if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
353             continue;
354 
355         latency_difference = (int64_t)o->total_latency - (int64_t)target_latency;
356         new_rate = rate_controller(o, base_rate, o->sink_input->sample_spec.rate, latency_difference);
357 
358         pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate);
359         pa_sink_input_set_rate(o->sink_input, new_rate);
360     }
361 
362     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, NULL, (int64_t) avg_total_latency, NULL);
363 }
364 
time_callback(pa_mainloop_api * a,pa_time_event * e,const struct timeval * t,void * userdata)365 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
366     struct userdata *u = userdata;
367 
368     pa_assert(u);
369     pa_assert(a);
370     pa_assert(u->time_event == e);
371 
372     if (u->sink->state == PA_SINK_SUSPENDED) {
373         u->core->mainloop->time_free(e);
374         u->time_event = NULL;
375     } else {
376         struct output *o;
377         uint32_t idx;
378 
379         pa_core_rttime_restart(u->core, e, pa_rtclock_now() + u->adjust_time);
380 
381         /* Get latency snapshots */
382         PA_IDXSET_FOREACH(o, u->outputs, idx) {
383             pa_asyncmsgq_send(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
384         }
385 
386     }
387     adjust_rates(u);
388 }
389 
process_render_null(struct userdata * u,pa_usec_t now)390 static void process_render_null(struct userdata *u, pa_usec_t now) {
391     size_t ate = 0;
392 
393     pa_assert(u);
394     pa_assert(u->sink->thread_info.state == PA_SINK_RUNNING);
395 
396     if (u->thread_info.in_null_mode)
397         u->thread_info.timestamp = now;
398 
399     while (u->thread_info.timestamp < now + u->block_usec) {
400         pa_memchunk chunk;
401 
402         pa_sink_render(u->sink, u->sink->thread_info.max_request, &chunk);
403         pa_memblock_unref(chunk.memblock);
404 
405         u->thread_info.counter += chunk.length;
406 
407 /*         pa_log_debug("Ate %lu bytes.", (unsigned long) chunk.length); */
408         u->thread_info.timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
409 
410         ate += chunk.length;
411 
412         if (ate >= u->sink->thread_info.max_request)
413             break;
414     }
415 
416 /*     pa_log_debug("Ate in sum %lu bytes (of %lu)", (unsigned long) ate, (unsigned long) nbytes); */
417 
418 #ifdef USE_SMOOTHER_2
419     pa_smoother_2_put(u->thread_info.smoother, now,
420                     u->thread_info.counter - pa_usec_to_bytes(u->thread_info.timestamp - now, &u->sink->sample_spec));
421 #else
422      pa_smoother_put(u->thread_info.smoother, now,
423                      pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec) - (u->thread_info.timestamp - now));
424 #endif
425 }
426 
thread_func(void * userdata)427 static void thread_func(void *userdata) {
428     struct userdata *u = userdata;
429 
430     pa_assert(u);
431 
432     pa_log_debug("Thread starting up");
433 
434     if (u->core->realtime_scheduling)
435         pa_thread_make_realtime(u->core->realtime_priority+1);
436 
437     pa_thread_mq_install(&u->thread_mq);
438 
439     u->thread_info.timestamp = pa_rtclock_now();
440     u->thread_info.in_null_mode = false;
441 
442     for (;;) {
443         int ret;
444 
445         if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
446             pa_sink_process_rewind(u->sink, 0);
447 
448         /* If no outputs are connected, render some data and drop it immediately. */
449         if (u->sink->thread_info.state == PA_SINK_RUNNING && !u->thread_info.active_outputs) {
450             pa_usec_t now;
451 
452             now = pa_rtclock_now();
453 
454             if (!u->thread_info.in_null_mode || u->thread_info.timestamp <= now)
455                 process_render_null(u, now);
456 
457             pa_rtpoll_set_timer_absolute(u->rtpoll, u->thread_info.timestamp);
458             u->thread_info.in_null_mode = true;
459         } else {
460             pa_rtpoll_set_timer_disabled(u->rtpoll);
461             u->thread_info.in_null_mode = false;
462         }
463 
464         /* Hmm, nothing to do. Let's sleep */
465         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
466             pa_log_info("pa_rtpoll_run() = %i", ret);
467             goto fail;
468         }
469 
470         if (ret == 0)
471             goto finish;
472     }
473 
474 fail:
475     /* If this was no regular exit from the loop we have to continue
476      * processing messages until we received PA_MESSAGE_SHUTDOWN */
477     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
478     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
479 
480 finish:
481     pa_log_debug("Thread shutting down");
482 }
483 
484 /* Called from combine sink I/O thread context */
render_memblock(struct userdata * u,struct output * o,size_t length)485 static void render_memblock(struct userdata *u, struct output *o, size_t length) {
486     pa_assert(u);
487     pa_assert(o);
488 
489     /* We are run by the sink thread, on behalf of an output (o). The
490      * output is waiting for us, hence it is safe to access its
491      * mainblockq and asyncmsgq directly. */
492 
493     /* If we are not running, we cannot produce any data */
494     if (!pa_atomic_load(&u->thread_info.running))
495         return;
496 
497     /* Maybe there's some data in the requesting output's queue
498      * now? */
499     while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
500         ;
501 
502     /* Ok, now let's prepare some data if we really have to. Save the
503      * the time for latency calculations. */
504     u->thread_info.render_timestamp = pa_rtclock_now();
505 
506     while (!pa_memblockq_is_readable(o->memblockq)) {
507         struct output *j;
508         pa_memchunk chunk;
509 
510         /* Render data! */
511         pa_sink_render(u->sink, length, &chunk);
512 
513         u->thread_info.counter += chunk.length;
514         o->receive_counter += chunk.length;
515 
516         /* OK, let's send this data to the other threads */
517         PA_LLIST_FOREACH(j, u->thread_info.active_outputs) {
518             if (j == o)
519                 continue;
520 
521             pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
522         }
523 
524         /* And place it directly into the requesting output's queue */
525         pa_memblockq_push_align(o->memblockq, &chunk);
526         pa_memblock_unref(chunk.memblock);
527     }
528 }
529 
530 /* Called from I/O thread context */
request_memblock(struct output * o,size_t length)531 static void request_memblock(struct output *o, size_t length) {
532     pa_assert(o);
533     pa_sink_input_assert_ref(o->sink_input);
534     pa_sink_assert_ref(o->userdata->sink);
535 
536     /* If another thread already prepared some data we received
537      * the data over the asyncmsgq, hence let's first process
538      * it. */
539     while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
540         ;
541 
542     /* Check whether we're now readable */
543     if (pa_memblockq_is_readable(o->memblockq))
544         return;
545 
546     /* OK, we need to prepare new data, but only if the sink is actually running */
547     if (pa_atomic_load(&o->userdata->thread_info.running))
548         pa_asyncmsgq_send(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_NEED, o, (int64_t) length, NULL);
549 }
550 
551 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input * i,size_t nbytes,pa_memchunk * chunk)552 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
553     struct output *o;
554 
555     pa_sink_input_assert_ref(i);
556     pa_assert_se(o = i->userdata);
557 
558     /* If necessary, get some new data */
559     request_memblock(o, nbytes);
560 
561     /* pa_log("%s q size is %u + %u (%u/%u)", */
562     /*        i->sink->name, */
563     /*        pa_memblockq_get_nblocks(o->memblockq), */
564     /*        pa_memblockq_get_nblocks(i->thread_info.render_memblockq), */
565     /*        pa_memblockq_get_maxrewind(o->memblockq), */
566     /*        pa_memblockq_get_maxrewind(i->thread_info.render_memblockq)); */
567 
568     if (pa_memblockq_peek(o->memblockq, chunk) < 0)
569         return -1;
570 
571     pa_memblockq_drop(o->memblockq, chunk->length);
572 
573     return 0;
574 }
575 
576 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)577 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
578     struct output *o;
579 
580     pa_sink_input_assert_ref(i);
581     pa_assert_se(o = i->userdata);
582 
583     pa_memblockq_rewind(o->memblockq, nbytes);
584 }
585 
586 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)587 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
588     struct output *o;
589 
590     pa_sink_input_assert_ref(i);
591     pa_assert_se(o = i->userdata);
592 
593     pa_memblockq_set_maxrewind(o->memblockq, nbytes);
594 }
595 
596 /* Called from I/O thread context */
sink_input_update_max_request_cb(pa_sink_input * i,size_t nbytes)597 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
598     struct output *o;
599 
600     pa_sink_input_assert_ref(i);
601     pa_assert_se(o = i->userdata);
602 
603     if (pa_atomic_load(&o->max_request) == (int) nbytes)
604         return;
605 
606     pa_atomic_store(&o->max_request, (int) nbytes);
607     pa_log_debug("Sink input update max request %lu", (unsigned long) nbytes);
608     pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
609 }
610 
611 /* Called from thread context */
sink_input_update_sink_latency_range_cb(pa_sink_input * i)612 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
613     struct output *o;
614     pa_usec_t min, max, fix;
615 
616     pa_assert(i);
617 
618     pa_sink_input_assert_ref(i);
619     pa_assert_se(o = i->userdata);
620 
621     fix = i->sink->thread_info.fixed_latency;
622     if (fix > 0) {
623         min = fix;
624         max = fix;
625     } else {
626         min = i->sink->thread_info.min_latency;
627         max = i->sink->thread_info.max_latency;
628     }
629 
630     if ((pa_atomic_load(&o->min_latency) == (int) min) &&
631         (pa_atomic_load(&o->max_latency) == (int) max))
632         return;
633 
634     pa_atomic_store(&o->min_latency, (int) min);
635     pa_atomic_store(&o->max_latency, (int) max);
636     pa_log_debug("Sink input update latency range %lu %lu", (unsigned long) min, (unsigned long) max);
637     pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
638 }
639 
640 /* Called from I/O thread context */
sink_input_attach_cb(pa_sink_input * i)641 static void sink_input_attach_cb(pa_sink_input *i) {
642     struct output *o;
643     pa_usec_t fix, min, max;
644     size_t nbytes;
645 
646     pa_sink_input_assert_ref(i);
647     pa_assert_se(o = i->userdata);
648 
649     /* Set up the queue from the sink thread to us */
650     pa_assert(!o->audio_inq_rtpoll_item_read);
651     pa_assert(!o->control_inq_rtpoll_item_read);
652     pa_assert(!o->outq_rtpoll_item_write);
653 
654     o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
655             i->sink->thread_info.rtpoll,
656             PA_RTPOLL_LATE,  /* This one is not that important, since we check for data in _peek() anyway. */
657             o->audio_inq);
658 
659     o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
660             i->sink->thread_info.rtpoll,
661             PA_RTPOLL_NORMAL,
662             o->control_inq);
663 
664     o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
665             i->sink->thread_info.rtpoll,
666             PA_RTPOLL_EARLY,
667             o->outq);
668 
669     pa_sink_input_request_rewind(i, 0, false, true, true);
670 
671     nbytes = pa_sink_input_get_max_request(i);
672     pa_atomic_store(&o->max_request, (int) nbytes);
673     pa_log_debug("attach max request %lu", (unsigned long) nbytes);
674 
675     fix = i->sink->thread_info.fixed_latency;
676     if (fix > 0) {
677         min = max = fix;
678     } else {
679         min = i->sink->thread_info.min_latency;
680         max = i->sink->thread_info.max_latency;
681     }
682     pa_atomic_store(&o->min_latency, (int) min);
683     pa_atomic_store(&o->max_latency, (int) max);
684     pa_log_debug("attach latency range %lu %lu", (unsigned long) min, (unsigned long) max);
685 
686     /* We register the output. That means that the sink will start to pass data to
687      * this output. */
688     pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
689 }
690 
691 /* Called from I/O thread context */
sink_input_detach_cb(pa_sink_input * i)692 static void sink_input_detach_cb(pa_sink_input *i) {
693     struct output *o;
694 
695     pa_sink_input_assert_ref(i);
696     pa_assert_se(o = i->userdata);
697 
698     /* We unregister the output. That means that the sink doesn't
699      * pass any further data to this output */
700     pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
701 
702     if (o->audio_inq_rtpoll_item_read) {
703         pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
704         o->audio_inq_rtpoll_item_read = NULL;
705     }
706 
707     if (o->control_inq_rtpoll_item_read) {
708         pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
709         o->control_inq_rtpoll_item_read = NULL;
710     }
711 
712     if (o->outq_rtpoll_item_write) {
713         pa_rtpoll_item_free(o->outq_rtpoll_item_write);
714         o->outq_rtpoll_item_write = NULL;
715     }
716 
717 }
718 
719 /* Called from main context */
sink_input_kill_cb(pa_sink_input * i)720 static void sink_input_kill_cb(pa_sink_input *i) {
721     struct output *o;
722 
723     pa_sink_input_assert_ref(i);
724     pa_assert_se(o = i->userdata);
725 
726     pa_module_unload_request(o->userdata->module, true);
727     pa_idxset_remove_by_data(o->userdata->outputs, o, NULL);
728     output_free(o);
729 }
730 
731 /* Called from thread context */
sink_input_process_msg(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)732 static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
733     struct output *o = PA_SINK_INPUT(obj)->userdata;
734 
735     switch (code) {
736 
737         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
738             pa_usec_t *r = data;
739 
740             *r = pa_bytes_to_usec(pa_memblockq_get_length(o->memblockq), &o->sink_input->sample_spec);
741 
742             /* Fall through, the default handler will add in the extra
743              * latency added by the resampler */
744             break;
745         }
746 
747         case SINK_INPUT_MESSAGE_POST:
748 
749             if (o->sink_input->sink->thread_info.state == PA_SINK_RUNNING) {
750                 pa_memblockq_push_align(o->memblockq, chunk);
751                 o->receive_counter += chunk->length;
752             } else
753                 pa_memblockq_flush_write(o->memblockq, true);
754 
755             return 0;
756 
757         case SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY: {
758             pa_usec_t latency = (pa_usec_t) offset;
759 
760             pa_sink_input_set_requested_latency_within_thread(o->sink_input, latency);
761 
762             return 0;
763         }
764 
765         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
766             size_t length;
767 
768             length = pa_memblockq_get_length(o->sink_input->thread_info.render_memblockq);
769 
770             o->latency_snapshot.output_memblockq_size = pa_memblockq_get_length(o->memblockq);
771 
772             /* Add content of memblockq's to sink latency */
773             o->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(o->sink, true) +
774                                                pa_bytes_to_usec(length, &o->sink->sample_spec);
775             /* Add resampler latency */
776             o->latency_snapshot.sink_latency += pa_resampler_get_delay_usec(o->sink_input->thread_info.resampler);
777 
778             o->latency_snapshot.timestamp = pa_rtclock_now();
779 
780             o->latency_snapshot.receive_counter = o->receive_counter;
781 
782             return 0;
783         }
784     }
785 
786     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
787 }
788 
789 /* Called from main context */
suspend(struct userdata * u)790 static void suspend(struct userdata *u) {
791     struct output *o;
792     uint32_t idx;
793 
794     pa_assert(u);
795 
796     /* Let's suspend by unlinking all streams */
797     PA_IDXSET_FOREACH(o, u->outputs, idx)
798         output_disable(o);
799 
800     pa_log_info("Device suspended...");
801 }
802 
803 /* Called from main context */
unsuspend(struct userdata * u)804 static void unsuspend(struct userdata *u) {
805     struct output *o;
806     uint32_t idx;
807 
808     pa_assert(u);
809 
810     /* Let's resume */
811     PA_IDXSET_FOREACH(o, u->outputs, idx)
812         output_enable(o);
813 
814     pa_log_info("Resumed successfully...");
815 }
816 
817 /* Called from main context */
sink_set_state_in_main_thread_cb(pa_sink * sink,pa_sink_state_t state,pa_suspend_cause_t suspend_cause)818 static int sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
819     struct userdata *u;
820 
821     pa_sink_assert_ref(sink);
822     pa_assert_se(u = sink->userdata);
823 
824     /* It may be that only the suspend cause is changing, in which
825      * case there's nothing to do. */
826     if (state == u->sink->state)
827         return 0;
828 
829     /* Please note that in contrast to the ALSA modules we call
830      * suspend/unsuspend from main context here! */
831 
832     switch (state) {
833         case PA_SINK_SUSPENDED:
834             pa_assert(PA_SINK_IS_OPENED(u->sink->state));
835 
836             suspend(u);
837             break;
838 
839         case PA_SINK_IDLE:
840         case PA_SINK_RUNNING:
841 
842             if (u->sink->state == PA_SINK_SUSPENDED)
843                 unsuspend(u);
844 
845             /* The first smoother update should be done early, otherwise the smoother will
846              * not be aware of the slave sink latencies and report far too small values.
847              * This is especially important if after an unsuspend the sink runs on a different
848              * latency than before. */
849             if (state == PA_SINK_RUNNING && !u->time_event && u->adjust_time > 0)
850                 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + pa_sink_get_requested_latency(u->sink), time_callback, u);
851 
852             break;
853 
854         case PA_SINK_UNLINKED:
855         case PA_SINK_INIT:
856         case PA_SINK_INVALID_STATE:
857             ;
858     }
859 
860     return 0;
861 }
862 
863 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)864 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
865     struct userdata *u;
866     bool running;
867 
868     pa_assert(s);
869     pa_assert_se(u = s->userdata);
870 
871     /* It may be that only the suspend cause is changing, in which case there's
872      * nothing to do. */
873     if (new_state == s->thread_info.state)
874         return 0;
875 
876     running = new_state == PA_SINK_RUNNING;
877     pa_atomic_store(&u->thread_info.running, running);
878 
879     if (running) {
880         u->thread_info.render_timestamp = 0;
881 #ifdef USE_SMOOTHER_2
882         pa_smoother_2_resume(u->thread_info.smoother, pa_rtclock_now());
883     } else
884         pa_smoother_2_pause(u->thread_info.smoother, pa_rtclock_now());
885 #else
886         pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true);
887     } else
888         pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now());
889 #endif
890 
891     return 0;
892 }
893 
894 /* Called from IO context */
update_max_request(struct userdata * u)895 static void update_max_request(struct userdata *u) {
896     size_t max_request = 0;
897     struct output *o;
898 
899     pa_assert(u);
900     pa_sink_assert_io_context(u->sink);
901 
902     /* Collects the max_request values of all streams and sets the
903      * largest one locally */
904 
905     PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
906         size_t mr = (size_t) pa_atomic_load(&o->max_request);
907 
908         if (mr > max_request)
909             max_request = mr;
910     }
911 
912     if (max_request <= 0)
913         max_request = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
914 
915     pa_log_debug("Sink update max request %lu", (unsigned long) max_request);
916     pa_sink_set_max_request_within_thread(u->sink, max_request);
917 }
918 
919 /* Called from IO context */
update_latency_range(struct userdata * u)920 static void update_latency_range(struct userdata *u) {
921     pa_usec_t min_latency = 0, max_latency = (pa_usec_t) -1;
922     struct output *o;
923 
924     pa_assert(u);
925     pa_sink_assert_io_context(u->sink);
926 
927     /* Collects the latency_range values of all streams and sets
928      * the max of min and min of max locally */
929     PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
930         pa_usec_t min = (size_t) pa_atomic_load(&o->min_latency);
931         pa_usec_t max = (size_t) pa_atomic_load(&o->max_latency);
932 
933         if (min > min_latency)
934             min_latency = min;
935         if (max_latency == (pa_usec_t) -1 || max < max_latency)
936             max_latency = max;
937     }
938     if (max_latency == (pa_usec_t) -1) {
939         /* No outputs, use default limits. */
940         min_latency = u->default_min_latency;
941         max_latency = u->default_max_latency;
942     }
943 
944     /* As long as we don't support rewinding, we should limit the max latency
945      * to a conservative value. */
946     if (max_latency > u->default_max_latency)
947         max_latency = u->default_max_latency;
948 
949     /* Never ever try to set lower max latency than min latency, it just
950      * doesn't make sense. */
951     if (max_latency < min_latency)
952         max_latency = min_latency;
953 
954     pa_log_debug("Sink update latency range %" PRIu64 " %" PRIu64, min_latency, max_latency);
955     pa_sink_set_latency_range_within_thread(u->sink, min_latency, max_latency);
956 }
957 
958 /* Called from thread context of the io thread */
output_add_within_thread(struct output * o)959 static void output_add_within_thread(struct output *o) {
960     pa_assert(o);
961     pa_sink_assert_io_context(o->sink);
962 
963     PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
964 
965     pa_assert(!o->outq_rtpoll_item_read);
966     pa_assert(!o->audio_inq_rtpoll_item_write);
967     pa_assert(!o->control_inq_rtpoll_item_write);
968 
969     o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
970             o->userdata->rtpoll,
971             PA_RTPOLL_EARLY-1,  /* This item is very important */
972             o->outq);
973     o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
974             o->userdata->rtpoll,
975             PA_RTPOLL_EARLY,
976             o->audio_inq);
977     o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
978             o->userdata->rtpoll,
979             PA_RTPOLL_NORMAL,
980             o->control_inq);
981     o->receive_counter = o->userdata->thread_info.counter;
982 }
983 
984 /* Called from thread context of the io thread */
output_remove_within_thread(struct output * o)985 static void output_remove_within_thread(struct output *o) {
986     pa_assert(o);
987     pa_sink_assert_io_context(o->sink);
988 
989     PA_LLIST_REMOVE(struct output, o->userdata->thread_info.active_outputs, o);
990 
991     if (o->outq_rtpoll_item_read) {
992         pa_rtpoll_item_free(o->outq_rtpoll_item_read);
993         o->outq_rtpoll_item_read = NULL;
994     }
995 
996     if (o->audio_inq_rtpoll_item_write) {
997         pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
998         o->audio_inq_rtpoll_item_write = NULL;
999     }
1000 
1001     if (o->control_inq_rtpoll_item_write) {
1002         pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
1003         o->control_inq_rtpoll_item_write = NULL;
1004     }
1005 }
1006 
1007 /* Called from sink I/O thread context */
sink_update_requested_latency(pa_sink * s)1008 static void sink_update_requested_latency(pa_sink *s) {
1009     struct userdata *u;
1010     struct output *o;
1011 
1012     pa_sink_assert_ref(s);
1013     pa_assert_se(u = s->userdata);
1014 
1015     u->block_usec = pa_sink_get_requested_latency_within_thread(s);
1016 
1017     if (u->block_usec == (pa_usec_t) -1)
1018         u->block_usec = s->thread_info.max_latency;
1019 
1020     pa_log_debug("Sink update requested latency %0.2f", (double) u->block_usec / PA_USEC_PER_MSEC);
1021 
1022     /* Just hand this one over to all sink_inputs */
1023     PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
1024         pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL,
1025                           u->block_usec, NULL, NULL);
1026     }
1027 }
1028 
1029 
1030 /* Called from thread context of the io thread */
sink_process_msg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)1031 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1032     struct userdata *u = PA_SINK(o)->userdata;
1033 
1034     switch (code) {
1035 
1036         case PA_SINK_MESSAGE_GET_LATENCY: {
1037             int64_t *delay = data;
1038 
1039 #ifdef USE_SMOOTHER_2
1040             *delay = pa_smoother_2_get_delay(u->thread_info.smoother, pa_rtclock_now(), u->thread_info.counter);
1041 #else
1042             pa_usec_t x, y, c;
1043 
1044             x = pa_rtclock_now();
1045             y = pa_smoother_get(u->thread_info.smoother, x);
1046 
1047             c = pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec);
1048 
1049             *delay = (int64_t)c - y;
1050 #endif
1051 
1052             return 0;
1053         }
1054 
1055         case SINK_MESSAGE_ADD_OUTPUT:
1056             output_add_within_thread(data);
1057             update_max_request(u);
1058             update_latency_range(u);
1059             return 0;
1060 
1061         case SINK_MESSAGE_REMOVE_OUTPUT:
1062             output_remove_within_thread(data);
1063             update_max_request(u);
1064             update_latency_range(u);
1065             return 0;
1066 
1067         case SINK_MESSAGE_NEED:
1068             render_memblock(u, (struct output*) data, (size_t) offset);
1069             return 0;
1070 
1071         case SINK_MESSAGE_UPDATE_LATENCY: {
1072 #ifdef USE_SMOOTHER_2
1073             size_t latency;
1074 
1075             latency = pa_usec_to_bytes((pa_usec_t)offset,  &u->sink->sample_spec);
1076             pa_smoother_2_put(u->thread_info.smoother, u->thread_info.snapshot_time, (int64_t)u->thread_info.snapshot_counter - latency);
1077 #else
1078             pa_usec_t x, y, latency = (pa_usec_t) offset;
1079 
1080             /* It may be possible that thread_info.counter has been increased
1081              * since we took the snapshot. Therefore we have to use the snapshot
1082              * time and counter instead of the current values. */
1083             x = u->thread_info.snapshot_time;
1084             y = pa_bytes_to_usec(u->thread_info.snapshot_counter, &u->sink->sample_spec);
1085 
1086             if (y > latency)
1087                 y -= latency;
1088             else
1089                 y = 0;
1090 
1091             pa_smoother_put(u->thread_info.smoother, x, y);
1092 #endif
1093             return 0;
1094         }
1095 
1096         case SINK_MESSAGE_GET_SNAPSHOT: {
1097             struct sink_snapshot *rdata = data;
1098 
1099             rdata->timestamp = u->thread_info.render_timestamp;
1100             rdata->send_counter = u->thread_info.counter;
1101             u->thread_info.snapshot_counter = u->thread_info.counter;
1102             u->thread_info.snapshot_time = u->thread_info.render_timestamp;
1103 
1104             return 0;
1105         }
1106 
1107         case SINK_MESSAGE_UPDATE_MAX_REQUEST:
1108             update_max_request(u);
1109             break;
1110 
1111         case SINK_MESSAGE_UPDATE_LATENCY_RANGE:
1112             update_latency_range(u);
1113             break;
1114 
1115 }
1116 
1117     return pa_sink_process_msg(o, code, data, offset, chunk);
1118 }
1119 
update_description(struct userdata * u)1120 static void update_description(struct userdata *u) {
1121     bool first = true;
1122     char *t;
1123     struct output *o;
1124     uint32_t idx;
1125 
1126     pa_assert(u);
1127 
1128     if (!u->auto_desc)
1129         return;
1130 
1131     if (pa_idxset_isempty(u->outputs)) {
1132         pa_sink_set_description(u->sink, "Simultaneous output");
1133         return;
1134     }
1135 
1136     t = pa_xstrdup("Simultaneous output to");
1137 
1138     PA_IDXSET_FOREACH(o, u->outputs, idx) {
1139         char *e;
1140 
1141         if (first) {
1142             e = pa_sprintf_malloc("%s %s", t, pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1143             first = false;
1144         } else
1145             e = pa_sprintf_malloc("%s, %s", t, pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1146 
1147         pa_xfree(t);
1148         t = e;
1149     }
1150 
1151     pa_sink_set_description(u->sink, t);
1152     pa_xfree(t);
1153 }
1154 
output_create_sink_input(struct output * o)1155 static int output_create_sink_input(struct output *o) {
1156     struct userdata *u;
1157     pa_sink_input_new_data data;
1158 
1159     pa_assert(o);
1160 
1161     if (o->sink_input)
1162         return 0;
1163 
1164     u = o->userdata;
1165 
1166     pa_sink_input_new_data_init(&data);
1167     pa_sink_input_new_data_set_sink(&data, o->sink, false, true);
1168     data.driver = __FILE__;
1169     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME, "Simultaneous output on %s", pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1170     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1171     pa_sink_input_new_data_set_sample_spec(&data, &u->sink->sample_spec);
1172     pa_sink_input_new_data_set_channel_map(&data, &u->sink->channel_map);
1173     data.module = u->module;
1174     data.resample_method = u->resample_method;
1175     data.flags = PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE|PA_SINK_INPUT_NO_CREATE_ON_SUSPEND;
1176     data.origin_sink = u->sink;
1177 
1178     if (!u->remix)
1179         data.flags |= PA_SINK_INPUT_NO_REMIX;
1180 
1181     pa_sink_input_new(&o->sink_input, u->core, &data);
1182 
1183     pa_sink_input_new_data_done(&data);
1184 
1185     if (!o->sink_input)
1186         return -1;
1187 
1188     o->sink_input->parent.process_msg = sink_input_process_msg;
1189     o->sink_input->pop = sink_input_pop_cb;
1190     o->sink_input->process_rewind = sink_input_process_rewind_cb;
1191     o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1192     o->sink_input->update_max_request = sink_input_update_max_request_cb;
1193     o->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1194     o->sink_input->attach = sink_input_attach_cb;
1195     o->sink_input->detach = sink_input_detach_cb;
1196     o->sink_input->kill = sink_input_kill_cb;
1197     o->sink_input->userdata = o;
1198 
1199     pa_sink_input_set_requested_latency(o->sink_input, pa_sink_get_requested_latency(u->sink));
1200 
1201     return 0;
1202 }
1203 
1204 /* Called from main context */
output_new(struct userdata * u,pa_sink * sink)1205 static struct output *output_new(struct userdata *u, pa_sink *sink) {
1206     struct output *o;
1207 
1208     pa_assert(u);
1209     pa_assert(sink);
1210     pa_assert(u->sink);
1211 
1212     o = pa_xnew0(struct output, 1);
1213     o->userdata = u;
1214 
1215     o->audio_inq = pa_asyncmsgq_new(0);
1216     if (!o->audio_inq) {
1217         pa_log("pa_asyncmsgq_new() failed.");
1218         goto fail;
1219     }
1220 
1221     o->control_inq = pa_asyncmsgq_new(0);
1222     if (!o->control_inq) {
1223         pa_log("pa_asyncmsgq_new() failed.");
1224         goto fail;
1225     }
1226 
1227     o->outq = pa_asyncmsgq_new(0);
1228     if (!o->outq) {
1229         pa_log("pa_asyncmsgq_new() failed.");
1230         goto fail;
1231     }
1232 
1233     o->sink = sink;
1234     o->memblockq = pa_memblockq_new(
1235             "module-combine-sink output memblockq",
1236             0,
1237             MEMBLOCKQ_MAXLENGTH,
1238             MEMBLOCKQ_MAXLENGTH,
1239             &u->sink->sample_spec,
1240             1,
1241             0,
1242             0,
1243             &u->sink->silence);
1244 
1245     pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0);
1246     update_description(u);
1247 
1248     return o;
1249 
1250 fail:
1251     output_free(o);
1252 
1253     return NULL;
1254 }
1255 
1256 /* Called from main context */
output_free(struct output * o)1257 static void output_free(struct output *o) {
1258     pa_assert(o);
1259 
1260     output_disable(o);
1261     update_description(o->userdata);
1262 
1263     if (o->audio_inq_rtpoll_item_read)
1264         pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
1265     if (o->audio_inq_rtpoll_item_write)
1266         pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
1267 
1268     if (o->control_inq_rtpoll_item_read)
1269         pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
1270     if (o->control_inq_rtpoll_item_write)
1271         pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
1272 
1273     if (o->outq_rtpoll_item_read)
1274         pa_rtpoll_item_free(o->outq_rtpoll_item_read);
1275     if (o->outq_rtpoll_item_write)
1276         pa_rtpoll_item_free(o->outq_rtpoll_item_write);
1277 
1278     if (o->audio_inq)
1279         pa_asyncmsgq_unref(o->audio_inq);
1280 
1281     if (o->control_inq)
1282         pa_asyncmsgq_unref(o->control_inq);
1283 
1284     if (o->outq)
1285         pa_asyncmsgq_unref(o->outq);
1286 
1287     if (o->memblockq)
1288         pa_memblockq_free(o->memblockq);
1289 
1290     pa_xfree(o);
1291 }
1292 
1293 /* Called from main context */
output_enable(struct output * o)1294 static void output_enable(struct output *o) {
1295     pa_assert(o);
1296 
1297     if (o->sink_input)
1298         return;
1299 
1300     /* This might cause the sink to be resumed. The state change hook
1301      * of the sink might hence be called from here, which might then
1302      * cause us to be called in a loop. Make sure that state changes
1303      * for this output don't cause this loop by setting a flag here */
1304     o->ignore_state_change = true;
1305 
1306     if (output_create_sink_input(o) >= 0) {
1307 
1308         if (o->sink->state != PA_SINK_INIT) {
1309             /* Enable the sink input. That means that the sink
1310              * is now asked for new data. */
1311             pa_sink_input_put(o->sink_input);
1312         }
1313     }
1314 
1315     o->ignore_state_change = false;
1316 }
1317 
1318 /* Called from main context */
output_disable(struct output * o)1319 static void output_disable(struct output *o) {
1320     pa_assert(o);
1321 
1322     if (!o->sink_input)
1323         return;
1324 
1325     /* We disable the sink input. That means that the sink is
1326      * not asked for new data anymore  */
1327     pa_sink_input_unlink(o->sink_input);
1328 
1329     /* Now deallocate the stream */
1330     pa_sink_input_unref(o->sink_input);
1331     o->sink_input = NULL;
1332 
1333     /* Finally, drop all queued data */
1334     pa_memblockq_flush_write(o->memblockq, true);
1335     pa_asyncmsgq_flush(o->audio_inq, false);
1336     pa_asyncmsgq_flush(o->control_inq, false);
1337     pa_asyncmsgq_flush(o->outq, false);
1338 }
1339 
1340 /* Called from main context */
output_verify(struct output * o)1341 static void output_verify(struct output *o) {
1342     pa_assert(o);
1343 
1344     if (PA_SINK_IS_OPENED(o->userdata->sink->state))
1345         output_enable(o);
1346     else
1347         output_disable(o);
1348 }
1349 
1350 /* Called from main context */
is_suitable_sink(struct userdata * u,pa_sink * s)1351 static bool is_suitable_sink(struct userdata *u, pa_sink *s) {
1352     const char *t;
1353 
1354     pa_sink_assert_ref(s);
1355 
1356     if (s == u->sink)
1357         return false;
1358 
1359     if (!(s->flags & PA_SINK_HARDWARE))
1360         return false;
1361 
1362     if (!(s->flags & PA_SINK_LATENCY))
1363         return false;
1364 
1365     if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_CLASS)))
1366         if (!pa_streq(t, "sound"))
1367             return false;
1368 
1369     return true;
1370 }
1371 
1372 /* Called from main context */
sink_put_hook_cb(pa_core * c,pa_sink * s,struct userdata * u)1373 static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1374     struct output *o;
1375 
1376     pa_core_assert_ref(c);
1377     pa_sink_assert_ref(s);
1378     pa_assert(u);
1379 
1380     if (u->automatic) {
1381         if (!is_suitable_sink(u, s))
1382             return PA_HOOK_OK;
1383     } else {
1384         /* Check if the sink is a previously unlinked slave (non-automatic mode) */
1385         pa_strlist *l = u->unlinked_slaves;
1386 
1387         while (l && !pa_streq(pa_strlist_data(l), s->name))
1388             l = pa_strlist_next(l);
1389 
1390         if (!l)
1391             return PA_HOOK_OK;
1392 
1393         u->unlinked_slaves = pa_strlist_remove(u->unlinked_slaves, s->name);
1394     }
1395 
1396     pa_log_info("Configuring new sink: %s", s->name);
1397     if (!(o = output_new(u, s))) {
1398         pa_log("Failed to create sink input on sink '%s'.", s->name);
1399         return PA_HOOK_OK;
1400     }
1401 
1402     output_verify(o);
1403 
1404     return PA_HOOK_OK;
1405 }
1406 
1407 /* Called from main context */
find_output(struct userdata * u,pa_sink * s)1408 static struct output* find_output(struct userdata *u, pa_sink *s) {
1409     struct output *o;
1410     uint32_t idx;
1411 
1412     pa_assert(u);
1413     pa_assert(s);
1414 
1415     if (u->sink == s)
1416         return NULL;
1417 
1418     PA_IDXSET_FOREACH(o, u->outputs, idx)
1419         if (o->sink == s)
1420             return o;
1421 
1422     return NULL;
1423 }
1424 
1425 /* Called from main context */
sink_unlink_hook_cb(pa_core * c,pa_sink * s,struct userdata * u)1426 static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1427     struct output *o;
1428 
1429     pa_assert(c);
1430     pa_sink_assert_ref(s);
1431     pa_assert(u);
1432 
1433     if (!(o = find_output(u, s)))
1434         return PA_HOOK_OK;
1435 
1436     pa_log_info("Unconfiguring sink: %s", s->name);
1437 
1438     if (!u->automatic)
1439         u->unlinked_slaves = pa_strlist_prepend(u->unlinked_slaves, s->name);
1440 
1441     pa_idxset_remove_by_data(u->outputs, o, NULL);
1442     output_free(o);
1443 
1444     return PA_HOOK_OK;
1445 }
1446 
1447 /* Called from main context */
sink_state_changed_hook_cb(pa_core * c,pa_sink * s,struct userdata * u)1448 static pa_hook_result_t sink_state_changed_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1449     struct output *o;
1450 
1451     if (!(o = find_output(u, s)))
1452         return PA_HOOK_OK;
1453 
1454     /* This state change might be triggered because we are creating a
1455      * stream here, in that case we don't want to create it a second
1456      * time here and enter a loop */
1457     if (o->ignore_state_change)
1458         return PA_HOOK_OK;
1459 
1460     output_verify(o);
1461 
1462     return PA_HOOK_OK;
1463 }
1464 
pa__init(pa_module * m)1465 int pa__init(pa_module*m) {
1466     struct userdata *u;
1467     pa_modargs *ma = NULL;
1468     const char *slaves, *rm;
1469     int resample_method;
1470     pa_sample_spec ss;
1471     pa_channel_map map;
1472     struct output *o;
1473     uint32_t idx;
1474     pa_sink_new_data data;
1475     uint32_t adjust_time_sec;
1476     size_t nbytes;
1477 
1478     pa_assert(m);
1479 
1480     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1481         pa_log("failed to parse module arguments");
1482         goto fail;
1483     }
1484 
1485     resample_method = m->core->resample_method;
1486     if ((rm = pa_modargs_get_value(ma, "resample_method", NULL))) {
1487         if ((resample_method = pa_parse_resample_method(rm)) < 0) {
1488             pa_log("invalid resample method '%s'", rm);
1489             goto fail;
1490         }
1491     }
1492 
1493     m->userdata = u = pa_xnew0(struct userdata, 1);
1494     u->core = m->core;
1495     u->module = m;
1496     u->rtpoll = pa_rtpoll_new();
1497 
1498     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
1499         pa_log("pa_thread_mq_init() failed.");
1500         goto fail;
1501     }
1502 
1503     u->remix = !m->core->disable_remixing;
1504     if (pa_modargs_get_value_boolean(ma, "remix", &u->remix) < 0) {
1505         pa_log("Invalid boolean remix parameter");
1506         goto fail;
1507     }
1508 
1509     u->resample_method = resample_method;
1510     u->outputs = pa_idxset_new(NULL, NULL);
1511 #ifndef USE_SMOOTHER_2
1512     u->thread_info.smoother = pa_smoother_new(
1513             PA_USEC_PER_SEC,
1514             PA_USEC_PER_SEC*2,
1515             true,
1516             true,
1517             10,
1518             pa_rtclock_now(),
1519             true);
1520 #endif
1521 
1522     adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1523     if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
1524         pa_log("Failed to parse adjust_time value");
1525         goto fail;
1526     }
1527 
1528     if (adjust_time_sec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1529         u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1530     else
1531         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1532 
1533     slaves = pa_modargs_get_value(ma, "slaves", NULL);
1534     u->automatic = !slaves;
1535 
1536     ss = m->core->default_sample_spec;
1537     map = m->core->default_channel_map;
1538 
1539     /* Check the specified slave sinks for sample_spec and channel_map to use for the combined sink */
1540     if (!u->automatic) {
1541         const char*split_state = NULL;
1542         char *n = NULL;
1543         pa_sample_spec slaves_spec;
1544         pa_channel_map slaves_map;
1545         bool is_first_slave = true;
1546 
1547         pa_sample_spec_init(&slaves_spec);
1548 
1549         while ((n = pa_split(slaves, ",", &split_state))) {
1550             pa_sink *slave_sink;
1551 
1552             if (!(slave_sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1553                 pa_log("Invalid slave sink '%s'", n);
1554                 pa_xfree(n);
1555                 goto fail;
1556             }
1557 
1558             pa_xfree(n);
1559 
1560             if (is_first_slave) {
1561                 slaves_spec = slave_sink->sample_spec;
1562                 slaves_map = slave_sink->channel_map;
1563                 is_first_slave = false;
1564             } else {
1565                 if (slaves_spec.format != slave_sink->sample_spec.format)
1566                     slaves_spec.format = PA_SAMPLE_INVALID;
1567 
1568                 if (slaves_spec.rate < slave_sink->sample_spec.rate)
1569                     slaves_spec.rate = slave_sink->sample_spec.rate;
1570 
1571                 if (!pa_channel_map_equal(&slaves_map, &slave_sink->channel_map))
1572                     slaves_spec.channels = 0;
1573             }
1574         }
1575 
1576         if (!is_first_slave) {
1577             if (slaves_spec.format != PA_SAMPLE_INVALID)
1578                 ss.format = slaves_spec.format;
1579 
1580             ss.rate = slaves_spec.rate;
1581 
1582             if (slaves_spec.channels > 0) {
1583                 map = slaves_map;
1584                 ss.channels = slaves_map.channels;
1585             }
1586         }
1587     }
1588 
1589     if ((pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0)) {
1590         pa_log("Invalid sample specification.");
1591         goto fail;
1592     }
1593 
1594     pa_sink_new_data_init(&data);
1595     data.namereg_fail = false;
1596     data.driver = __FILE__;
1597     data.module = m;
1598     pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
1599     pa_sink_new_data_set_sample_spec(&data, &ss);
1600     pa_sink_new_data_set_channel_map(&data, &map);
1601     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1602 
1603     if (slaves)
1604         pa_proplist_sets(data.proplist, "combine.slaves", slaves);
1605 
1606     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1607         pa_log("Invalid properties");
1608         pa_sink_new_data_done(&data);
1609         goto fail;
1610     }
1611 
1612     /* Check proplist for a description & fill in a default value if not */
1613     u->auto_desc = false;
1614     if (NULL == pa_proplist_gets(data.proplist, PA_PROP_DEVICE_DESCRIPTION)) {
1615         u->auto_desc = true;
1616         pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output");
1617     }
1618 
1619     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
1620     pa_sink_new_data_done(&data);
1621 
1622     if (!u->sink) {
1623         pa_log("Failed to create sink");
1624         goto fail;
1625     }
1626 
1627 #ifdef USE_SMOOTHER_2
1628     /* The smoother window size needs to be larger than the time between updates */
1629     u->thread_info.smoother = pa_smoother_2_new(u->adjust_time + 5*PA_USEC_PER_SEC, pa_rtclock_now(), pa_frame_size(&u->sink->sample_spec), u->sink->sample_spec.rate);
1630 #endif
1631 
1632     u->sink->parent.process_msg = sink_process_msg;
1633     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
1634     u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
1635     u->sink->update_requested_latency = sink_update_requested_latency;
1636     u->sink->userdata = u;
1637 
1638     pa_sink_set_rtpoll(u->sink, u->rtpoll);
1639     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1640 
1641     nbytes = pa_usec_to_bytes(BLOCK_USEC, &u->sink->sample_spec);
1642     pa_sink_set_max_request(u->sink, nbytes);
1643     pa_sink_set_latency_range(u->sink, 0, BLOCK_USEC);
1644     /* pulse clamps the range, get the real values */
1645     u->default_min_latency = u->sink->thread_info.min_latency;
1646     u->default_max_latency = u->sink->thread_info.max_latency;
1647     u->block_usec = u->sink->thread_info.max_latency;
1648 
1649 
1650     if (!u->automatic) {
1651         const char*split_state;
1652         char *n = NULL;
1653         pa_assert(slaves);
1654 
1655         /* The slaves have been specified manually */
1656 
1657         split_state = NULL;
1658         while ((n = pa_split(slaves, ",", &split_state))) {
1659             pa_sink *slave_sink;
1660 
1661             if (!(slave_sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK)) || slave_sink == u->sink) {
1662                 pa_log("Invalid slave sink '%s'", n);
1663                 pa_xfree(n);
1664                 goto fail;
1665             }
1666 
1667             pa_xfree(n);
1668 
1669             if (!output_new(u, slave_sink)) {
1670                 pa_log("Failed to create slave sink input on sink '%s'.", slave_sink->name);
1671                 goto fail;
1672             }
1673         }
1674 
1675         if (pa_idxset_size(u->outputs) <= 1)
1676             pa_log_warn("No slave sinks specified.");
1677 
1678         u->sink_put_slot = NULL;
1679 
1680     } else {
1681         pa_sink *s;
1682 
1683         /* We're in automatic mode, we add every sink that matches our needs  */
1684 
1685         PA_IDXSET_FOREACH(s, m->core->sinks, idx) {
1686 
1687             if (!is_suitable_sink(u, s))
1688                 continue;
1689 
1690             if (!output_new(u, s)) {
1691                 pa_log("Failed to create sink input on sink '%s'.", s->name);
1692                 goto fail;
1693             }
1694         }
1695     }
1696 
1697     u->sink_put_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_PUT], PA_HOOK_LATE, (pa_hook_cb_t) sink_put_hook_cb, u);
1698     u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_EARLY, (pa_hook_cb_t) sink_unlink_hook_cb, u);
1699     u->sink_state_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_STATE_CHANGED], PA_HOOK_NORMAL, (pa_hook_cb_t) sink_state_changed_hook_cb, u);
1700 
1701     u->thread_info.render_timestamp = 0;
1702 
1703     if (!(u->thread = pa_thread_new("combine", thread_func, u))) {
1704         pa_log("Failed to create thread.");
1705         goto fail;
1706     }
1707 
1708     /* Activate the sink and the sink inputs */
1709     pa_sink_put(u->sink);
1710 
1711     PA_IDXSET_FOREACH(o, u->outputs, idx)
1712         output_verify(o);
1713 
1714     pa_modargs_free(ma);
1715 
1716     return 0;
1717 
1718 fail:
1719 
1720     if (ma)
1721         pa_modargs_free(ma);
1722 
1723     pa__done(m);
1724 
1725     return -1;
1726 }
1727 
pa__done(pa_module * m)1728 void pa__done(pa_module*m) {
1729     struct userdata *u;
1730 
1731     pa_assert(m);
1732 
1733     if (!(u = m->userdata))
1734         return;
1735 
1736     if (u->sink && PA_SINK_IS_LINKED(u->sink->state))
1737         pa_sink_suspend(u->sink, true, PA_SUSPEND_UNAVAILABLE);
1738 
1739     pa_strlist_free(u->unlinked_slaves);
1740 
1741     if (u->sink_put_slot)
1742         pa_hook_slot_free(u->sink_put_slot);
1743 
1744     if (u->sink_unlink_slot)
1745         pa_hook_slot_free(u->sink_unlink_slot);
1746 
1747     if (u->sink_state_changed_slot)
1748         pa_hook_slot_free(u->sink_state_changed_slot);
1749 
1750     if (u->outputs)
1751         pa_idxset_free(u->outputs, (pa_free_cb_t) output_free);
1752 
1753     if (u->sink)
1754         pa_sink_unlink(u->sink);
1755 
1756     if (u->thread) {
1757         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1758         pa_thread_free(u->thread);
1759     }
1760 
1761     pa_thread_mq_done(&u->thread_mq);
1762 
1763     if (u->sink)
1764         pa_sink_unref(u->sink);
1765 
1766     if (u->rtpoll)
1767         pa_rtpoll_free(u->rtpoll);
1768 
1769     if (u->time_event)
1770         u->core->mainloop->time_free(u->time_event);
1771 
1772     if (u->thread_info.smoother)
1773 #ifdef USE_SMOOTHER_2
1774         pa_smoother_2_free(u->thread_info.smoother);
1775 #else
1776         pa_smoother_free(u->thread_info.smoother);
1777 #endif
1778 
1779     pa_xfree(u);
1780 }
1781