• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2     This file is part of PulseAudio.
3 
4     Copyright 2009 Intel Corporation
5     Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
6 
7     PulseAudio is free software; you can redistribute it and/or modify
8     it under the terms of the GNU Lesser General Public License as published
9     by the Free Software Foundation; either version 2.1 of the License,
10     or (at your option) any later version.
11 
12     PulseAudio is distributed in the hope that it will be useful, but
13     WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15     General Public License for more details.
16 
17     You should have received a copy of the GNU Lesser General Public License
18     along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 
27 #include <pulse/xmalloc.h>
28 
29 #include <pulsecore/sink-input.h>
30 #include <pulsecore/module.h>
31 #include <pulsecore/modargs.h>
32 #include <pulsecore/namereg.h>
33 #include <pulsecore/log.h>
34 #include <pulsecore/core-util.h>
35 
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 
39 PA_MODULE_AUTHOR("Pierre-Louis Bossart");
40 PA_MODULE_DESCRIPTION("Loopback from source to sink");
41 PA_MODULE_VERSION(PACKAGE_VERSION);
42 PA_MODULE_LOAD_ONCE(false);
43 PA_MODULE_USAGE(
44         "source=<source to connect to> "
45         "sink=<sink to connect to> "
46         "adjust_time=<how often to readjust rates in s> "
47         "latency_msec=<latency in ms> "
48         "max_latency_msec=<maximum latency in ms> "
49         "fast_adjust_threshold_msec=<threshold for fast adjust in ms> "
50         "format=<sample format> "
51         "rate=<sample rate> "
52         "channels=<number of channels> "
53         "channel_map=<channel map> "
54         "sink_input_properties=<proplist> "
55         "source_output_properties=<proplist> "
56         "source_dont_move=<boolean> "
57         "sink_dont_move=<boolean> "
58         "remix=<remix channels?> ");
59 
60 #define DEFAULT_LATENCY_MSEC 200
61 
62 #define MEMBLOCKQ_MAXLENGTH (1024*1024*32)
63 
64 #define MIN_DEVICE_LATENCY (2.5*PA_USEC_PER_MSEC)
65 
66 #define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC)
67 
68 typedef struct loopback_msg loopback_msg;
69 
70 struct userdata {
71     pa_core *core;
72     pa_module *module;
73 
74     loopback_msg *msg;
75 
76     pa_sink_input *sink_input;
77     pa_source_output *source_output;
78 
79     pa_asyncmsgq *asyncmsgq;
80     pa_memblockq *memblockq;
81 
82     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
83 
84     pa_time_event *time_event;
85 
86     /* Variables used to calculate the average time between
87      * subsequent calls of adjust_rates() */
88     pa_usec_t adjust_time_stamp;
89     pa_usec_t real_adjust_time;
90     pa_usec_t real_adjust_time_sum;
91 
92     /* Values from command line configuration */
93     pa_usec_t latency;
94     pa_usec_t max_latency;
95     pa_usec_t adjust_time;
96     pa_usec_t fast_adjust_threshold;
97 
98     /* Latency boundaries and current values */
99     pa_usec_t min_source_latency;
100     pa_usec_t max_source_latency;
101     pa_usec_t min_sink_latency;
102     pa_usec_t max_sink_latency;
103     pa_usec_t configured_sink_latency;
104     pa_usec_t configured_source_latency;
105     int64_t source_latency_offset;
106     int64_t sink_latency_offset;
107     pa_usec_t minimum_latency;
108 
109     /* lower latency limit found by underruns */
110     pa_usec_t underrun_latency_limit;
111 
112     /* Various counters */
113     uint32_t iteration_counter;
114     uint32_t underrun_counter;
115     uint32_t adjust_counter;
116 
117     bool fixed_alsa_source;
118     bool source_sink_changed;
119 
120     /* Used for sink input and source output snapshots */
121     struct {
122         int64_t send_counter;
123         int64_t source_latency;
124         pa_usec_t source_timestamp;
125 
126         int64_t recv_counter;
127         size_t loopback_memblockq_length;
128         int64_t sink_latency;
129         pa_usec_t sink_timestamp;
130     } latency_snapshot;
131 
132     /* Input thread variable */
133     int64_t send_counter;
134 
135     /* Output thread variables */
136     struct {
137         int64_t recv_counter;
138         pa_usec_t effective_source_latency;
139 
140         /* Copied from main thread */
141         pa_usec_t minimum_latency;
142 
143         /* Various booleans */
144         bool in_pop;
145         bool pop_called;
146         bool pop_adjust;
147         bool first_pop_done;
148         bool push_called;
149     } output_thread_info;
150 };
151 
152 struct loopback_msg {
153     pa_msgobject parent;
154     struct userdata *userdata;
155 };
156 
157 PA_DEFINE_PRIVATE_CLASS(loopback_msg, pa_msgobject);
158 #define LOOPBACK_MSG(o) (loopback_msg_cast(o))
159 
160 static const char* const valid_modargs[] = {
161     "source",
162     "sink",
163     "adjust_time",
164     "latency_msec",
165     "max_latency_msec",
166     "fast_adjust_threshold_msec",
167     "format",
168     "rate",
169     "channels",
170     "channel_map",
171     "sink_input_properties",
172     "source_output_properties",
173     "source_dont_move",
174     "sink_dont_move",
175     "remix",
176     NULL,
177 };
178 
179 enum {
180     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
181     SINK_INPUT_MESSAGE_REWIND,
182     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
183     SINK_INPUT_MESSAGE_SOURCE_CHANGED,
184     SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY,
185     SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
186     SINK_INPUT_MESSAGE_FAST_ADJUST,
187 };
188 
189 enum {
190     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX,
191 };
192 
193 enum {
194     LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED,
195     LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED,
196     LOOPBACK_MESSAGE_UNDERRUN,
197 };
198 
199 static void enable_adjust_timer(struct userdata *u, bool enable);
200 
201 /* Called from main context */
teardown(struct userdata * u)202 static void teardown(struct userdata *u) {
203     pa_assert(u);
204     pa_assert_ctl_context();
205 
206     u->adjust_time = 0;
207     enable_adjust_timer(u, false);
208 
209     /* Handling the asyncmsgq between the source output and the sink input
210      * requires some care. When the source output is unlinked, nothing needs
211      * to be done for the asyncmsgq, because the source output is the sending
212      * end. But when the sink input is unlinked, we should ensure that the
213      * asyncmsgq is emptied, because the messages in the queue hold references
214      * to the sink input. Also, we need to ensure that new messages won't be
215      * written to the queue after we have emptied it.
216      *
217      * Emptying the queue can be done in the state_change() callback of the
218      * sink input, when the new state is "unlinked".
219      *
220      * Preventing new messages from being written to the queue can be achieved
221      * by unlinking the source output before unlinking the sink input. There
222      * are no other writers for that queue, so this is sufficient. */
223 
224     if (u->source_output) {
225         pa_source_output_unlink(u->source_output);
226         pa_source_output_unref(u->source_output);
227         u->source_output = NULL;
228     }
229 
230     if (u->sink_input) {
231         pa_sink_input_unlink(u->sink_input);
232         pa_sink_input_unref(u->sink_input);
233         u->sink_input = NULL;
234     }
235 }
236 
237 /* rate controller, called from main context
238  * - maximum deviation from base rate is less than 1%
239  * - can create audible artifacts by changing the rate too quickly
240  * - exhibits hunting with USB or Bluetooth sources
241  */
rate_controller(uint32_t base_rate,pa_usec_t adjust_time,int32_t latency_difference_usec)242 static uint32_t rate_controller(
243                 uint32_t base_rate,
244                 pa_usec_t adjust_time,
245                 int32_t latency_difference_usec) {
246 
247     uint32_t new_rate;
248     double min_cycles;
249 
250     /* Calculate best rate to correct the current latency offset, limit at
251      * slightly below 1% difference from base_rate */
252     min_cycles = (double)abs(latency_difference_usec) / adjust_time / 0.01 + 1;
253     new_rate = base_rate * (1.0 + (double)latency_difference_usec / min_cycles / adjust_time);
254 
255     return new_rate;
256 }
257 
258 /* Called from main thread.
259  * It has been a matter of discussion how to correctly calculate the minimum
260  * latency that module-loopback can deliver with a given source and sink.
261  * The calculation has been placed in a separate function so that the definition
262  * can easily be changed. The resulting estimate is not very exact because it
263  * depends on the reported latency ranges. In cases were the lower bounds of
264  * source and sink latency are not reported correctly (USB) the result will
265  * be wrong. */
update_minimum_latency(struct userdata * u,pa_sink * sink,bool print_msg)266 static void update_minimum_latency(struct userdata *u, pa_sink *sink, bool print_msg) {
267 
268     if (u->underrun_latency_limit)
269         /* If we already detected a real latency limit because of underruns, use it */
270         u->minimum_latency = u->underrun_latency_limit;
271 
272     else {
273         /* Calculate latency limit from latency ranges */
274 
275         u->minimum_latency = u->min_sink_latency;
276         if (u->fixed_alsa_source)
277             /* If we are using an alsa source with fixed latency, we will get a wakeup when
278              * one fragment is filled, and then we empty the source buffer, so the source
279              * latency never grows much beyond one fragment (assuming that the CPU doesn't
280              * cause a bottleneck). */
281             u->minimum_latency += u->core->default_fragment_size_msec * PA_USEC_PER_MSEC;
282 
283         else
284             /* In all other cases the source will deliver new data at latest after one source latency.
285              * Make sure there is enough data available that the sink can keep on playing until new
286              * data is pushed. */
287             u->minimum_latency += u->min_source_latency;
288 
289         /* Multiply by 1.1 as a safety margin for delays that are proportional to the buffer sizes */
290         u->minimum_latency *= 1.1;
291 
292         /* Add 1.5 ms as a safety margin for delays not related to the buffer sizes */
293         u->minimum_latency += 1.5 * PA_USEC_PER_MSEC;
294     }
295 
296     /* Add the latency offsets */
297     if (-(u->sink_latency_offset + u->source_latency_offset) <= (int64_t)u->minimum_latency)
298         u->minimum_latency += u->sink_latency_offset + u->source_latency_offset;
299     else
300         u->minimum_latency = 0;
301 
302     /* If the sink is valid, send a message to update the minimum latency to
303      * the output thread, else set the variable directly */
304     if (sink)
305         pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY, NULL, u->minimum_latency, NULL);
306     else
307         u->output_thread_info.minimum_latency = u->minimum_latency;
308 
309     if (print_msg) {
310         pa_log_info("Minimum possible end to end latency: %0.2f ms", (double)u->minimum_latency / PA_USEC_PER_MSEC);
311         if (u->latency < u->minimum_latency)
312             pa_log_warn("Configured latency of %0.2f ms is smaller than minimum latency, using minimum instead", (double)u->latency / PA_USEC_PER_MSEC);
313     }
314 }
315 
316 /* Called from main context */
adjust_rates(struct userdata * u)317 static void adjust_rates(struct userdata *u) {
318     size_t buffer;
319     uint32_t old_rate, base_rate, new_rate, run_hours;
320     int32_t latency_difference;
321     pa_usec_t current_buffer_latency, snapshot_delay;
322     int64_t current_source_sink_latency, current_latency, latency_at_optimum_rate;
323     pa_usec_t final_latency, now, time_passed;
324 
325     pa_assert(u);
326     pa_assert_ctl_context();
327 
328     /* Runtime and counters since last change of source or sink
329      * or source/sink latency */
330     run_hours = u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600;
331     u->iteration_counter +=1;
332 
333     /* If we are seeing underruns then the latency is too small */
334     if (u->underrun_counter > 2) {
335         pa_usec_t target_latency;
336 
337         target_latency = PA_MAX(u->latency, u->minimum_latency) + 5 * PA_USEC_PER_MSEC;
338 
339         if (u->max_latency == 0 || target_latency < u->max_latency) {
340             u->underrun_latency_limit = PA_CLIP_SUB((int64_t)target_latency, u->sink_latency_offset + u->source_latency_offset);
341             pa_log_warn("Too many underruns, increasing latency to %0.2f ms", (double)target_latency / PA_USEC_PER_MSEC);
342         } else {
343             u->underrun_latency_limit = PA_CLIP_SUB((int64_t)u->max_latency, u->sink_latency_offset + u->source_latency_offset);
344             pa_log_warn("Too many underruns, configured maximum latency of %0.2f ms is reached", (double)u->max_latency / PA_USEC_PER_MSEC);
345             pa_log_warn("Consider increasing the max_latency_msec");
346         }
347 
348         update_minimum_latency(u, u->sink_input->sink, false);
349         u->underrun_counter = 0;
350     }
351 
352     /* Allow one underrun per hour */
353     if (u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600 > run_hours) {
354         u->underrun_counter = PA_CLIP_SUB(u->underrun_counter, 1u);
355         pa_log_info("Underrun counter: %u", u->underrun_counter);
356     }
357 
358     /* Calculate real adjust time if source or sink did not change and if the system has
359      * not been suspended. If the time between two calls is more than 5% longer than the
360      * configured adjust time, we assume that the system has been sleeping and skip the
361      * calculation for this iteration. */
362     now = pa_rtclock_now();
363     time_passed = now - u->adjust_time_stamp;
364     if (!u->source_sink_changed && time_passed < u->adjust_time * 1.05) {
365         u->adjust_counter++;
366         u->real_adjust_time_sum += time_passed;
367         u->real_adjust_time = u->real_adjust_time_sum / u->adjust_counter;
368     }
369     u->adjust_time_stamp = now;
370 
371     /* Rates and latencies */
372     old_rate = u->sink_input->sample_spec.rate;
373     base_rate = u->source_output->sample_spec.rate;
374 
375     buffer = u->latency_snapshot.loopback_memblockq_length;
376     if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter)
377         buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
378     else
379         buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
380 
381     current_buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
382     snapshot_delay = u->latency_snapshot.source_timestamp - u->latency_snapshot.sink_timestamp;
383     current_source_sink_latency = u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency - snapshot_delay;
384 
385     /* Current latency */
386     current_latency = current_source_sink_latency + current_buffer_latency;
387 
388     /* Latency at base rate */
389     latency_at_optimum_rate = current_source_sink_latency + current_buffer_latency * old_rate / base_rate;
390 
391     final_latency = PA_MAX(u->latency, u->minimum_latency);
392     latency_difference = (int32_t)(latency_at_optimum_rate - final_latency);
393 
394     pa_log_debug("Loopback overall latency is %0.2f ms + %0.2f ms + %0.2f ms = %0.2f ms",
395                 (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
396                 (double) current_buffer_latency / PA_USEC_PER_MSEC,
397                 (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
398                 (double) current_latency / PA_USEC_PER_MSEC);
399 
400     pa_log_debug("Loopback latency at base rate is %0.2f ms", (double)latency_at_optimum_rate / PA_USEC_PER_MSEC);
401 
402     /* Drop or insert samples if fast_adjust_threshold_msec was specified and the latency difference is too large. */
403     if (u->fast_adjust_threshold > 0 && abs(latency_difference) > u->fast_adjust_threshold) {
404         pa_log_debug ("Latency difference larger than %" PRIu64 " msec, skipping or inserting samples.", u->fast_adjust_threshold / PA_USEC_PER_MSEC);
405 
406         pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_FAST_ADJUST, NULL, current_source_sink_latency, NULL);
407 
408         /* Skip real adjust time calculation on next iteration. */
409         u->source_sink_changed = true;
410         return;
411     }
412 
413     /* Calculate new rate */
414     new_rate = rate_controller(base_rate, u->real_adjust_time, latency_difference);
415 
416     u->source_sink_changed = false;
417 
418     /* Set rate */
419     pa_sink_input_set_rate(u->sink_input, new_rate);
420     pa_log_debug("[%s] Updated sampling rate to %lu Hz.", u->sink_input->sink->name, (unsigned long) new_rate);
421 }
422 
423 /* Called from main context */
time_callback(pa_mainloop_api * a,pa_time_event * e,const struct timeval * t,void * userdata)424 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
425     struct userdata *u = userdata;
426 
427     pa_assert(u);
428     pa_assert(a);
429     pa_assert(u->time_event == e);
430 
431     /* Restart timer right away */
432     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
433 
434     /* Get sink and source latency snapshot */
435     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
436     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
437 
438     adjust_rates(u);
439 }
440 
441 /* Called from main context
442  * When source or sink changes, give it a third of a second to settle down, then call adjust_rates for the first time */
enable_adjust_timer(struct userdata * u,bool enable)443 static void enable_adjust_timer(struct userdata *u, bool enable) {
444     if (enable) {
445         if (!u->adjust_time)
446             return;
447         if (u->time_event)
448             u->core->mainloop->time_free(u->time_event);
449 
450         u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + 333 * PA_USEC_PER_MSEC, time_callback, u);
451     } else {
452         if (!u->time_event)
453             return;
454 
455         u->core->mainloop->time_free(u->time_event);
456         u->time_event = NULL;
457     }
458 }
459 
460 /* Called from main context */
update_adjust_timer(struct userdata * u)461 static void update_adjust_timer(struct userdata *u) {
462     if (u->sink_input->state == PA_SINK_INPUT_CORKED || u->source_output->state == PA_SOURCE_OUTPUT_CORKED)
463         enable_adjust_timer(u, false);
464     else
465         enable_adjust_timer(u, true);
466 }
467 
468 /* Called from main thread
469  * Calculates minimum and maximum possible latency for source and sink */
update_latency_boundaries(struct userdata * u,pa_source * source,pa_sink * sink)470 static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_sink *sink) {
471     const char *s;
472 
473     if (source) {
474         /* Source latencies */
475         u->fixed_alsa_source = false;
476         if (source->flags & PA_SOURCE_DYNAMIC_LATENCY)
477             pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency);
478         else {
479             u->min_source_latency = pa_source_get_fixed_latency(source);
480             u->max_source_latency = u->min_source_latency;
481             if ((s = pa_proplist_gets(source->proplist, PA_PROP_DEVICE_API))) {
482                 if (pa_streq(s, "alsa"))
483                     u->fixed_alsa_source = true;
484             }
485         }
486         /* Source offset */
487         u->source_latency_offset = source->port_latency_offset;
488 
489         /* Latencies below 2.5 ms cause problems, limit source latency if possible */
490         if (u->max_source_latency >= MIN_DEVICE_LATENCY)
491             u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY);
492         else
493             u->min_source_latency = u->max_source_latency;
494     }
495 
496     if (sink) {
497         /* Sink latencies */
498         if (sink->flags & PA_SINK_DYNAMIC_LATENCY)
499             pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency);
500         else {
501             u->min_sink_latency = pa_sink_get_fixed_latency(sink);
502             u->max_sink_latency = u->min_sink_latency;
503         }
504         /* Sink offset */
505         u->sink_latency_offset = sink->port_latency_offset;
506 
507         /* Latencies below 2.5 ms cause problems, limit sink latency if possible */
508         if (u->max_sink_latency >= MIN_DEVICE_LATENCY)
509             u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY);
510         else
511             u->min_sink_latency = u->max_sink_latency;
512     }
513 
514     update_minimum_latency(u, sink, true);
515 }
516 
517 /* Called from output context
518  * Sets the memblockq to the configured latency corrected by latency_offset_usec */
memblockq_adjust(struct userdata * u,int64_t latency_offset_usec,bool allow_push)519 static void memblockq_adjust(struct userdata *u, int64_t latency_offset_usec, bool allow_push) {
520     size_t current_memblockq_length, requested_memblockq_length, buffer_correction;
521     int64_t requested_buffer_latency;
522     pa_usec_t final_latency, requested_sink_latency;
523 
524     final_latency = PA_MAX(u->latency, u->output_thread_info.minimum_latency);
525 
526     /* If source or sink have some large negative latency offset, we might want to
527      * hold more than final_latency in the memblockq */
528     requested_buffer_latency = (int64_t)final_latency - latency_offset_usec;
529 
530     /* Keep at least one sink latency in the queue to make sure that the sink
531      * never underruns initially */
532     requested_sink_latency = pa_sink_get_requested_latency_within_thread(u->sink_input->sink);
533     if (requested_buffer_latency < (int64_t)requested_sink_latency)
534         requested_buffer_latency = requested_sink_latency;
535 
536     requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec);
537     current_memblockq_length = pa_memblockq_get_length(u->memblockq);
538 
539     if (current_memblockq_length > requested_memblockq_length) {
540         /* Drop audio from queue */
541         buffer_correction = current_memblockq_length - requested_memblockq_length;
542         pa_log_info("Dropping %" PRIu64 " usec of audio from queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
543         pa_memblockq_drop(u->memblockq, buffer_correction);
544 
545     } else if (current_memblockq_length < requested_memblockq_length && allow_push) {
546         /* Add silence to queue */
547         buffer_correction = requested_memblockq_length - current_memblockq_length;
548         pa_log_info("Adding %" PRIu64 " usec of silence to queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
549         pa_memblockq_seek(u->memblockq, (int64_t)buffer_correction, PA_SEEK_RELATIVE, true);
550     }
551 }
552 
553 /* Called from input thread context */
source_output_push_cb(pa_source_output * o,const pa_memchunk * chunk)554 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
555     struct userdata *u;
556     pa_usec_t push_time;
557     int64_t current_source_latency;
558 
559     pa_source_output_assert_ref(o);
560     pa_source_output_assert_io_context(o);
561     pa_assert_se(u = o->userdata);
562 
563     /* Send current source latency and timestamp with the message */
564     push_time = pa_rtclock_now();
565     current_source_latency = pa_source_get_latency_within_thread(u->source_output->source, true);
566 
567     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_INT_TO_PTR(current_source_latency), push_time, chunk, NULL);
568     u->send_counter += (int64_t) chunk->length;
569 }
570 
571 /* Called from input thread context */
source_output_process_rewind_cb(pa_source_output * o,size_t nbytes)572 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
573     struct userdata *u;
574 
575     pa_source_output_assert_ref(o);
576     pa_source_output_assert_io_context(o);
577     pa_assert_se(u = o->userdata);
578 
579     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
580     u->send_counter -= (int64_t) nbytes;
581 }
582 
583 /* Called from input thread context */
source_output_process_msg_cb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)584 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
585     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
586 
587     switch (code) {
588 
589         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
590             size_t length;
591 
592             length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
593 
594             u->latency_snapshot.send_counter = u->send_counter;
595             /* Add content of delay memblockq to the source latency */
596             u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, true) +
597                                                  pa_bytes_to_usec(length, &u->source_output->source->sample_spec);
598             u->latency_snapshot.source_timestamp = pa_rtclock_now();
599 
600             return 0;
601         }
602     }
603 
604     return pa_source_output_process_msg(obj, code, data, offset, chunk);
605 }
606 
607 /* Called from main thread.
608  * Get current effective latency of the source. If the source is in use with
609  * smaller latency than the configured latency, it will continue running with
610  * the smaller value when the source output is switched to the source. */
update_effective_source_latency(struct userdata * u,pa_source * source,pa_sink * sink)611 static void update_effective_source_latency(struct userdata *u, pa_source *source, pa_sink *sink) {
612     pa_usec_t effective_source_latency;
613 
614     effective_source_latency = u->configured_source_latency;
615 
616     if (source) {
617         effective_source_latency = pa_source_get_requested_latency(source);
618         if (effective_source_latency == 0 || effective_source_latency > u->configured_source_latency)
619             effective_source_latency = u->configured_source_latency;
620     }
621 
622     /* If the sink is valid, send a message to the output thread, else set the variable directly */
623     if (sink)
624         pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effective_source_latency, NULL);
625     else
626        u->output_thread_info.effective_source_latency = effective_source_latency;
627 }
628 
629 /* Called from main thread.
630  * Set source output latency to one third of the overall latency if possible.
631  * The choice of one third is rather arbitrary somewhere between the minimum
632  * possible latency which would cause a lot of CPU load and half the configured
633  * latency which would quickly lead to underruns */
set_source_output_latency(struct userdata * u,pa_source * source)634 static void set_source_output_latency(struct userdata *u, pa_source *source) {
635     pa_usec_t latency, requested_latency;
636 
637     requested_latency = u->latency / 3;
638 
639     /* Normally we try to configure sink and source latency equally. If the
640      * sink latency cannot match the requested source latency try to set the
641      * source latency to a smaller value to avoid underruns */
642     if (u->min_sink_latency > requested_latency) {
643         latency = PA_MAX(u->latency, u->minimum_latency);
644         requested_latency = (latency - u->min_sink_latency) / 2;
645     }
646 
647     latency = PA_CLAMP(requested_latency , u->min_source_latency, u->max_source_latency);
648     u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency);
649     if (u->configured_source_latency != requested_latency)
650         pa_log_warn("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC);
651 }
652 
653 /* Called from input thread context */
source_output_attach_cb(pa_source_output * o)654 static void source_output_attach_cb(pa_source_output *o) {
655     struct userdata *u;
656 
657     pa_source_output_assert_ref(o);
658     pa_source_output_assert_io_context(o);
659     pa_assert_se(u = o->userdata);
660 
661     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
662             o->source->thread_info.rtpoll,
663             PA_RTPOLL_LATE,
664             u->asyncmsgq);
665 }
666 
667 /* Called from input thread context */
source_output_detach_cb(pa_source_output * o)668 static void source_output_detach_cb(pa_source_output *o) {
669     struct userdata *u;
670 
671     pa_source_output_assert_ref(o);
672     pa_source_output_assert_io_context(o);
673     pa_assert_se(u = o->userdata);
674 
675     if (u->rtpoll_item_write) {
676         pa_rtpoll_item_free(u->rtpoll_item_write);
677         u->rtpoll_item_write = NULL;
678     }
679 }
680 
681 /* Called from main thread */
source_output_kill_cb(pa_source_output * o)682 static void source_output_kill_cb(pa_source_output *o) {
683     struct userdata *u;
684 
685     pa_source_output_assert_ref(o);
686     pa_assert_ctl_context();
687     pa_assert_se(u = o->userdata);
688 
689     teardown(u);
690     pa_module_unload_request(u->module, true);
691 }
692 
693 /* Called from main thread */
source_output_may_move_to_cb(pa_source_output * o,pa_source * dest)694 static bool source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
695     struct userdata *u;
696 
697     pa_source_output_assert_ref(o);
698     pa_assert_ctl_context();
699     pa_assert_se(u = o->userdata);
700 
701     if (!u->sink_input || !u->sink_input->sink)
702         return true;
703 
704     return dest != u->sink_input->sink->monitor_source;
705 }
706 
707 /* Called from main thread */
source_output_moving_cb(pa_source_output * o,pa_source * dest)708 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
709     struct userdata *u;
710     char *input_description;
711     const char *n;
712 
713     if (!dest)
714         return;
715 
716     pa_source_output_assert_ref(o);
717     pa_assert_ctl_context();
718     pa_assert_se(u = o->userdata);
719 
720     input_description = pa_sprintf_malloc("Loopback of %s",
721                                           pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
722     pa_sink_input_set_property(u->sink_input, PA_PROP_MEDIA_NAME, input_description);
723     pa_xfree(input_description);
724 
725     if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
726         pa_sink_input_set_property(u->sink_input, PA_PROP_DEVICE_ICON_NAME, n);
727 
728     /* Set latency and calculate latency limits */
729     u->underrun_latency_limit = 0;
730     update_latency_boundaries(u, dest, u->sink_input->sink);
731     set_source_output_latency(u, dest);
732     update_effective_source_latency(u, dest, u->sink_input->sink);
733 
734     /* Uncork the sink input unless the destination is suspended for other
735      * reasons than idle. */
736     if (dest->state == PA_SOURCE_SUSPENDED)
737         pa_sink_input_cork(u->sink_input, (dest->suspend_cause != PA_SUSPEND_IDLE));
738     else
739         pa_sink_input_cork(u->sink_input, false);
740 
741     update_adjust_timer(u);
742 
743     /* Reset counters */
744     u->iteration_counter = 0;
745     u->underrun_counter = 0;
746 
747     u->source_sink_changed = true;
748 
749     /* Send a mesage to the output thread that the source has changed.
750      * If the sink is invalid here during a profile switching situation
751      * we can safely set push_called to false directly. */
752     if (u->sink_input->sink)
753         pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
754     else
755         u->output_thread_info.push_called = false;
756 
757     /* The sampling rate may be far away from the default rate if we are still
758      * recovering from a previous source or sink change, so reset rate to
759      * default before moving the source. */
760     pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
761 }
762 
763 /* Called from main thread */
source_output_suspend_cb(pa_source_output * o,pa_source_state_t old_state,pa_suspend_cause_t old_suspend_cause)764 static void source_output_suspend_cb(pa_source_output *o, pa_source_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
765     struct userdata *u;
766     bool suspended;
767 
768     pa_source_output_assert_ref(o);
769     pa_assert_ctl_context();
770     pa_assert_se(u = o->userdata);
771 
772     /* State has not changed, nothing to do */
773     if (old_state == o->source->state)
774         return;
775 
776     suspended = (o->source->state == PA_SOURCE_SUSPENDED);
777 
778     /* If the source has been suspended, we need to handle this like
779      * a source change when the source is resumed */
780     if (suspended) {
781         if (u->sink_input->sink)
782             pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
783         else
784             u->output_thread_info.push_called = false;
785 
786     } else
787         /* Get effective source latency on unsuspend */
788         update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
789 
790     pa_sink_input_cork(u->sink_input, suspended);
791 
792     update_adjust_timer(u);
793 }
794 
795 /* Called from input thread context */
update_source_latency_range_cb(pa_source_output * i)796 static void update_source_latency_range_cb(pa_source_output *i) {
797     struct userdata *u;
798 
799     pa_source_output_assert_ref(i);
800     pa_source_output_assert_io_context(i);
801     pa_assert_se(u = i->userdata);
802 
803     /* Source latency may have changed */
804     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
805 }
806 
807 /* Called from output thread context */
sink_input_pop_cb(pa_sink_input * i,size_t nbytes,pa_memchunk * chunk)808 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
809     struct userdata *u;
810 
811     pa_sink_input_assert_ref(i);
812     pa_sink_input_assert_io_context(i);
813     pa_assert_se(u = i->userdata);
814     pa_assert(chunk);
815 
816     /* It seems necessary to handle outstanding push messages here, though it is not clear
817      * why. Removing this part leads to underruns when low latencies are configured. */
818     u->output_thread_info.in_pop = true;
819     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
820         ;
821     u->output_thread_info.in_pop = false;
822 
823     /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are
824      * enabled. Disable them on second pop and enable the final adjustment during the
825      * next push. The adjustment must be done on the next push, because there is no way
826      * to retrieve the source latency here. We are waiting for the second pop, because
827      * the first pop may be called before the sink is actually started. */
828     if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) {
829         u->output_thread_info.pop_adjust = true;
830         u->output_thread_info.pop_called = true;
831     }
832     u->output_thread_info.first_pop_done = true;
833 
834     if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
835         pa_log_info("Could not peek into queue");
836         return -1;
837     }
838 
839     chunk->length = PA_MIN(chunk->length, nbytes);
840     pa_memblockq_drop(u->memblockq, chunk->length);
841 
842     /* Adjust the memblockq to ensure that there is
843      * enough data in the queue to avoid underruns. */
844     if (!u->output_thread_info.push_called)
845         memblockq_adjust(u, 0, true);
846 
847     return 0;
848 }
849 
850 /* Called from output thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)851 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
852     struct userdata *u;
853 
854     pa_sink_input_assert_ref(i);
855     pa_sink_input_assert_io_context(i);
856     pa_assert_se(u = i->userdata);
857 
858     pa_memblockq_rewind(u->memblockq, nbytes);
859 }
860 
861 /* Called from output thread context */
sink_input_process_msg_cb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)862 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
863     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
864 
865     pa_sink_input_assert_io_context(u->sink_input);
866 
867     switch (code) {
868 
869         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
870             pa_usec_t *r = data;
871 
872             *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
873 
874             /* Fall through, the default handler will add in the extra
875              * latency added by the resampler */
876             break;
877         }
878 
879         case SINK_INPUT_MESSAGE_POST:
880 
881             pa_memblockq_push_align(u->memblockq, chunk);
882 
883             /* If push has not been called yet, latency adjustments in sink_input_pop_cb()
884              * are enabled. Disable them on first push and correct the memblockq. If pop
885              * has not been called yet, wait until the pop_cb() requests the adjustment */
886             if (u->output_thread_info.pop_called && (!u->output_thread_info.push_called || u->output_thread_info.pop_adjust)) {
887                 int64_t time_delta;
888 
889                 /* This is the source latency at the time push was called */
890                 time_delta = PA_PTR_TO_INT(data);
891                 /* Add the time between push and post */
892                 time_delta += pa_rtclock_now() - (pa_usec_t) offset;
893                 /* Add the sink latency */
894                 time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink, true);
895 
896                 /* The source latency report includes the audio in the chunk,
897                  * but since we already pushed the chunk to the memblockq, we need
898                  * to subtract the chunk size from the source latency so that it
899                  * won't be counted towards both the memblockq latency and the
900                  * source latency.
901                  *
902                  * Sometimes the alsa source reports way too low latency (might
903                  * be a bug in the alsa source code). This seems to happen when
904                  * there's an overrun. As an attempt to detect overruns, we
905                  * check if the chunk size is larger than the configured source
906                  * latency. If so, we assume that the source should have pushed
907                  * a chunk whose size equals the configured latency, so we
908                  * modify time_delta only by that amount, which makes
909                  * memblockq_adjust() drop more data than it would otherwise.
910                  * This seems to work quite well, but it's possible that the
911                  * next push also contains too much data, and in that case the
912                  * resulting latency will be wrong. */
913                 if (pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency)
914                     time_delta -= (int64_t)u->output_thread_info.effective_source_latency;
915                 else
916                     time_delta -= (int64_t)pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec);
917 
918                 /* FIXME: We allow pushing silence here to fix up the latency. This
919                  * might lead to a gap in the stream */
920                 memblockq_adjust(u, time_delta, true);
921 
922                 u->output_thread_info.pop_adjust = false;
923                 u->output_thread_info.push_called = true;
924             }
925 
926             /* If pop has not been called yet, make sure the latency does not grow too much.
927              * Don't push any silence here, because we already have new data in the queue */
928             if (!u->output_thread_info.pop_called)
929                  memblockq_adjust(u, 0, false);
930 
931             /* Is this the end of an underrun? Then let's start things
932              * right-away */
933             if (u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED &&
934                 u->sink_input->thread_info.underrun_for > 0 &&
935                 pa_memblockq_is_readable(u->memblockq)) {
936 
937                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_UNDERRUN, NULL, 0, NULL, NULL);
938                 /* If called from within the pop callback skip the rewind */
939                 if (!u->output_thread_info.in_pop) {
940                     pa_log_debug("Requesting rewind due to end of underrun.");
941                     pa_sink_input_request_rewind(u->sink_input,
942                                                  (size_t) (u->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : u->sink_input->thread_info.underrun_for),
943                                                  false, true, false);
944                 }
945             }
946 
947             u->output_thread_info.recv_counter += (int64_t) chunk->length;
948 
949             return 0;
950 
951         case SINK_INPUT_MESSAGE_REWIND:
952 
953             /* Do not try to rewind if no data was pushed yet */
954             if (u->output_thread_info.push_called)
955                 pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
956 
957             u->output_thread_info.recv_counter -= offset;
958 
959             return 0;
960 
961         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
962             size_t length;
963 
964             length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
965 
966             u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
967             u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
968             /* Add content of render memblockq to sink latency */
969             u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
970                                                pa_bytes_to_usec(length, &u->sink_input->sink->sample_spec);
971             u->latency_snapshot.sink_timestamp = pa_rtclock_now();
972 
973             return 0;
974         }
975 
976         case SINK_INPUT_MESSAGE_SOURCE_CHANGED:
977 
978             u->output_thread_info.push_called = false;
979 
980             return 0;
981 
982         case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY:
983 
984             u->output_thread_info.effective_source_latency = (pa_usec_t)offset;
985 
986             return 0;
987 
988         case SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY:
989 
990             u->output_thread_info.minimum_latency = (pa_usec_t)offset;
991 
992             return 0;
993 
994         case SINK_INPUT_MESSAGE_FAST_ADJUST:
995 
996             memblockq_adjust(u, offset, true);
997 
998             return 0;
999     }
1000 
1001     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1002 }
1003 /* Called from main thread.
1004  * Set sink input latency to one third of the overall latency if possible.
1005  * The choice of one third is rather arbitrary somewhere between the minimum
1006  * possible latency which would cause a lot of CPU load and half the configured
1007  * latency which would quickly lead to underruns. */
set_sink_input_latency(struct userdata * u,pa_sink * sink)1008 static void set_sink_input_latency(struct userdata *u, pa_sink *sink) {
1009      pa_usec_t latency, requested_latency;
1010 
1011     requested_latency = u->latency / 3;
1012 
1013     /* Normally we try to configure sink and source latency equally. If the
1014      * source latency cannot match the requested sink latency try to set the
1015      * sink latency to a smaller value to avoid underruns */
1016     if (u->min_source_latency > requested_latency) {
1017         latency = PA_MAX(u->latency, u->minimum_latency);
1018         requested_latency = (latency - u->min_source_latency) / 2;
1019     }
1020 
1021     latency = PA_CLAMP(requested_latency , u->min_sink_latency, u->max_sink_latency);
1022     u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency);
1023     if (u->configured_sink_latency != requested_latency)
1024         pa_log_warn("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC);
1025 }
1026 
1027 /* Called from output thread context */
sink_input_attach_cb(pa_sink_input * i)1028 static void sink_input_attach_cb(pa_sink_input *i) {
1029     struct userdata *u;
1030 
1031     pa_sink_input_assert_ref(i);
1032     pa_sink_input_assert_io_context(i);
1033     pa_assert_se(u = i->userdata);
1034 
1035     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1036             i->sink->thread_info.rtpoll,
1037             PA_RTPOLL_LATE,
1038             u->asyncmsgq);
1039 
1040     pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2);
1041     pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
1042 }
1043 
1044 /* Called from output thread context */
sink_input_detach_cb(pa_sink_input * i)1045 static void sink_input_detach_cb(pa_sink_input *i) {
1046     struct userdata *u;
1047 
1048     pa_sink_input_assert_ref(i);
1049     pa_sink_input_assert_io_context(i);
1050     pa_assert_se(u = i->userdata);
1051 
1052     if (u->rtpoll_item_read) {
1053         pa_rtpoll_item_free(u->rtpoll_item_read);
1054         u->rtpoll_item_read = NULL;
1055     }
1056 }
1057 
1058 /* Called from output thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)1059 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1060     struct userdata *u;
1061 
1062     pa_sink_input_assert_ref(i);
1063     pa_sink_input_assert_io_context(i);
1064     pa_assert_se(u = i->userdata);
1065 
1066     pa_memblockq_set_maxrewind(u->memblockq, nbytes);
1067 }
1068 
1069 /* Called from output thread context */
sink_input_update_max_request_cb(pa_sink_input * i,size_t nbytes)1070 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1071     struct userdata *u;
1072 
1073     pa_sink_input_assert_ref(i);
1074     pa_sink_input_assert_io_context(i);
1075     pa_assert_se(u = i->userdata);
1076 
1077     pa_memblockq_set_prebuf(u->memblockq, nbytes*2);
1078     pa_log_info("Max request changed");
1079 }
1080 
1081 /* Called from main thread */
sink_input_kill_cb(pa_sink_input * i)1082 static void sink_input_kill_cb(pa_sink_input *i) {
1083     struct userdata *u;
1084 
1085     pa_sink_input_assert_ref(i);
1086     pa_assert_ctl_context();
1087     pa_assert_se(u = i->userdata);
1088 
1089     teardown(u);
1090     pa_module_unload_request(u->module, true);
1091 }
1092 
1093 /* Called from the output thread context */
sink_input_state_change_cb(pa_sink_input * i,pa_sink_input_state_t state)1094 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1095     struct userdata *u;
1096 
1097     pa_sink_input_assert_ref(i);
1098     pa_assert_se(u = i->userdata);
1099 
1100     if (state == PA_SINK_INPUT_UNLINKED)
1101         pa_asyncmsgq_flush(u->asyncmsgq, false);
1102 }
1103 
1104 /* Called from main thread */
sink_input_moving_cb(pa_sink_input * i,pa_sink * dest)1105 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1106     struct userdata *u;
1107     char *output_description;
1108     const char *n;
1109 
1110     if (!dest)
1111         return;
1112 
1113     pa_sink_input_assert_ref(i);
1114     pa_assert_ctl_context();
1115     pa_assert_se(u = i->userdata);
1116 
1117     output_description = pa_sprintf_malloc("Loopback to %s",
1118                                            pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1119     pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_NAME, output_description);
1120     pa_xfree(output_description);
1121 
1122     if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
1123         pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_ICON_NAME, n);
1124 
1125     /* Set latency and calculate latency limits */
1126     u->underrun_latency_limit = 0;
1127     update_latency_boundaries(u, NULL, dest);
1128     set_sink_input_latency(u, dest);
1129     update_effective_source_latency(u, u->source_output->source, dest);
1130 
1131     /* Uncork the source output unless the destination is suspended for other
1132      * reasons than idle */
1133     if (dest->state == PA_SINK_SUSPENDED)
1134         pa_source_output_cork(u->source_output, (dest->suspend_cause != PA_SUSPEND_IDLE));
1135     else
1136         pa_source_output_cork(u->source_output, false);
1137 
1138     update_adjust_timer(u);
1139 
1140     /* Reset counters */
1141     u->iteration_counter = 0;
1142     u->underrun_counter = 0;
1143 
1144     u->source_sink_changed = true;
1145 
1146     u->output_thread_info.pop_called = false;
1147     u->output_thread_info.first_pop_done = false;
1148 
1149     /* Sample rate may be far away from the default rate if we are still
1150      * recovering from a previous source or sink change, so reset rate to
1151      * default before moving the sink. */
1152     pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
1153 }
1154 
1155 /* Called from main thread */
sink_input_may_move_to_cb(pa_sink_input * i,pa_sink * dest)1156 static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1157     struct userdata *u;
1158 
1159     pa_sink_input_assert_ref(i);
1160     pa_assert_ctl_context();
1161     pa_assert_se(u = i->userdata);
1162 
1163     if (!u->source_output || !u->source_output->source)
1164         return true;
1165 
1166     return dest != u->source_output->source->monitor_of;
1167 }
1168 
1169 /* Called from main thread */
sink_input_suspend_cb(pa_sink_input * i,pa_sink_state_t old_state,pa_suspend_cause_t old_suspend_cause)1170 static void sink_input_suspend_cb(pa_sink_input *i, pa_sink_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
1171     struct userdata *u;
1172     bool suspended;
1173 
1174     pa_sink_input_assert_ref(i);
1175     pa_assert_ctl_context();
1176     pa_assert_se(u = i->userdata);
1177 
1178     /* State has not changed, nothing to do */
1179     if (old_state == i->sink->state)
1180         return;
1181 
1182     suspended = (i->sink->state == PA_SINK_SUSPENDED);
1183 
1184     /* If the sink has been suspended, we need to handle this like
1185      * a sink change when the sink is resumed. Because the sink
1186      * is suspended, we can set the variables directly. */
1187     if (suspended) {
1188         u->output_thread_info.pop_called = false;
1189         u->output_thread_info.first_pop_done = false;
1190 
1191     } else
1192         /* Set effective source latency on unsuspend */
1193         update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1194 
1195     pa_source_output_cork(u->source_output, suspended);
1196 
1197     update_adjust_timer(u);
1198 }
1199 
1200 /* Called from output thread context */
update_sink_latency_range_cb(pa_sink_input * i)1201 static void update_sink_latency_range_cb(pa_sink_input *i) {
1202     struct userdata *u;
1203 
1204     pa_sink_input_assert_ref(i);
1205     pa_sink_input_assert_io_context(i);
1206     pa_assert_se(u = i->userdata);
1207 
1208     /* Sink latency may have changed */
1209     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1210 }
1211 
1212 /* Called from main context */
loopback_process_msg_cb(pa_msgobject * o,int code,void * userdata,int64_t offset,pa_memchunk * chunk)1213 static int loopback_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1214     struct loopback_msg *msg;
1215     struct userdata *u;
1216     pa_usec_t current_latency;
1217 
1218     pa_assert(o);
1219     pa_assert_ctl_context();
1220 
1221     msg = LOOPBACK_MSG(o);
1222     pa_assert_se(u = msg->userdata);
1223 
1224     switch (code) {
1225 
1226         case LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED:
1227 
1228             update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1229             current_latency = pa_source_get_requested_latency(u->source_output->source);
1230             if (current_latency > u->configured_source_latency) {
1231                 /* The minimum latency has changed to a value larger than the configured latency, so
1232                  * the source latency has been increased. The case that the minimum latency changes
1233                  * back to a smaller value is not handled because this never happens with the current
1234                  * source implementations. */
1235                 pa_log_warn("Source minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1236                 u->configured_source_latency = current_latency;
1237                 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1238                 /* We re-start counting when the latency has changed */
1239                 u->iteration_counter = 0;
1240                 u->underrun_counter = 0;
1241             }
1242 
1243             return 0;
1244 
1245         case LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED:
1246 
1247             current_latency = pa_sink_get_requested_latency(u->sink_input->sink);
1248             if (current_latency > u->configured_sink_latency) {
1249                 /* The minimum latency has changed to a value larger than the configured latency, so
1250                  * the sink latency has been increased. The case that the minimum latency changes back
1251                  * to a smaller value is not handled because this never happens with the current sink
1252                  * implementations. */
1253                 pa_log_warn("Sink minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1254                 u->configured_sink_latency = current_latency;
1255                 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1256                 /* We re-start counting when the latency has changed */
1257                 u->iteration_counter = 0;
1258                 u->underrun_counter = 0;
1259             }
1260 
1261             return 0;
1262 
1263         case LOOPBACK_MESSAGE_UNDERRUN:
1264 
1265             u->underrun_counter++;
1266             pa_log_debug("Underrun detected, counter incremented to %u", u->underrun_counter);
1267 
1268             return 0;
1269 
1270     }
1271 
1272     return 0;
1273 }
1274 
sink_port_latency_offset_changed_cb(pa_core * core,pa_sink * sink,struct userdata * u)1275 static pa_hook_result_t sink_port_latency_offset_changed_cb(pa_core *core, pa_sink *sink, struct userdata *u) {
1276 
1277     if (sink != u->sink_input->sink)
1278         return PA_HOOK_OK;
1279 
1280     u->sink_latency_offset = sink->port_latency_offset;
1281     update_minimum_latency(u, sink, true);
1282 
1283     return PA_HOOK_OK;
1284 }
1285 
source_port_latency_offset_changed_cb(pa_core * core,pa_source * source,struct userdata * u)1286 static pa_hook_result_t source_port_latency_offset_changed_cb(pa_core *core, pa_source *source, struct userdata *u) {
1287 
1288     if (source != u->source_output->source)
1289         return PA_HOOK_OK;
1290 
1291     u->source_latency_offset = source->port_latency_offset;
1292     update_minimum_latency(u, u->sink_input->sink, true);
1293 
1294     return PA_HOOK_OK;
1295 }
1296 
pa__init(pa_module * m)1297 int pa__init(pa_module *m) {
1298     pa_modargs *ma = NULL;
1299     struct userdata *u;
1300     pa_sink *sink = NULL;
1301     pa_sink_input_new_data sink_input_data;
1302     bool sink_dont_move;
1303     pa_source *source = NULL;
1304     pa_source_output_new_data source_output_data;
1305     bool source_dont_move;
1306     uint32_t latency_msec;
1307     uint32_t max_latency_msec;
1308     uint32_t fast_adjust_threshold;
1309     pa_sample_spec ss;
1310     pa_channel_map map;
1311     bool format_set = false;
1312     bool rate_set = false;
1313     bool channels_set = false;
1314     pa_memchunk silence;
1315     uint32_t adjust_time_sec;
1316     const char *n;
1317     bool remix = true;
1318 
1319     pa_assert(m);
1320 
1321     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1322         pa_log("Failed to parse module arguments");
1323         goto fail;
1324     }
1325 
1326     n = pa_modargs_get_value(ma, "source", NULL);
1327     if (n && !(source = pa_namereg_get(m->core, n, PA_NAMEREG_SOURCE))) {
1328         pa_log("No such source.");
1329         goto fail;
1330     }
1331 
1332     n = pa_modargs_get_value(ma, "sink", NULL);
1333     if (n && !(sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1334         pa_log("No such sink.");
1335         goto fail;
1336     }
1337 
1338     if (pa_modargs_get_value_boolean(ma, "remix", &remix) < 0) {
1339         pa_log("Invalid boolean remix parameter");
1340         goto fail;
1341     }
1342 
1343     if (source) {
1344         ss = source->sample_spec;
1345         map = source->channel_map;
1346         format_set = true;
1347         rate_set = true;
1348         channels_set = true;
1349     } else if (sink) {
1350         ss = sink->sample_spec;
1351         map = sink->channel_map;
1352         format_set = true;
1353         rate_set = true;
1354         channels_set = true;
1355     } else {
1356         /* FIXME: Dummy stream format, needed because pa_sink_input_new()
1357          * requires valid sample spec and channel map even when all the FIX_*
1358          * stream flags are specified. pa_sink_input_new() should be changed
1359          * to ignore the sample spec and channel map when the FIX_* flags are
1360          * present. */
1361         ss.format = PA_SAMPLE_U8;
1362         ss.rate = 8000;
1363         ss.channels = 1;
1364         map.channels = 1;
1365         map.map[0] = PA_CHANNEL_POSITION_MONO;
1366     }
1367 
1368     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1369         pa_log("Invalid sample format specification or channel map");
1370         goto fail;
1371     }
1372 
1373     if (ss.rate < 4000 || ss.rate > PA_RATE_MAX) {
1374         pa_log("Invalid rate specification, valid range is 4000 Hz to %i Hz", PA_RATE_MAX);
1375         goto fail;
1376     }
1377 
1378     if (pa_modargs_get_value(ma, "format", NULL))
1379         format_set = true;
1380 
1381     if (pa_modargs_get_value(ma, "rate", NULL))
1382         rate_set = true;
1383 
1384     if (pa_modargs_get_value(ma, "channels", NULL) || pa_modargs_get_value(ma, "channel_map", NULL))
1385         channels_set = true;
1386 
1387     latency_msec = DEFAULT_LATENCY_MSEC;
1388     if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 30000) {
1389         pa_log("Invalid latency specification");
1390         goto fail;
1391     }
1392 
1393     fast_adjust_threshold = 0;
1394     if (pa_modargs_get_value_u32(ma, "fast_adjust_threshold_msec", &fast_adjust_threshold) < 0 || (fast_adjust_threshold != 0 && fast_adjust_threshold < 100)) {
1395         pa_log("Invalid fast adjust threshold specification");
1396         goto fail;
1397     }
1398 
1399     max_latency_msec = 0;
1400     if (pa_modargs_get_value_u32(ma, "max_latency_msec", &max_latency_msec) < 0) {
1401         pa_log("Invalid maximum latency specification");
1402         goto fail;
1403     }
1404 
1405     if (max_latency_msec > 0 && max_latency_msec < latency_msec) {
1406         pa_log_warn("Configured maximum latency is smaller than latency, using latency instead");
1407         max_latency_msec = latency_msec;
1408     }
1409 
1410     m->userdata = u = pa_xnew0(struct userdata, 1);
1411     u->core = m->core;
1412     u->module = m;
1413     u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
1414     u->max_latency = (pa_usec_t) max_latency_msec * PA_USEC_PER_MSEC;
1415     u->output_thread_info.pop_called = false;
1416     u->output_thread_info.pop_adjust = false;
1417     u->output_thread_info.push_called = false;
1418     u->iteration_counter = 0;
1419     u->underrun_counter = 0;
1420     u->underrun_latency_limit = 0;
1421     u->source_sink_changed = true;
1422     u->real_adjust_time_sum = 0;
1423     u->adjust_counter = 0;
1424     u->fast_adjust_threshold = fast_adjust_threshold * PA_USEC_PER_MSEC;
1425 
1426     adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1427     if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
1428         pa_log("Failed to parse adjust_time value");
1429         goto fail;
1430     }
1431 
1432     if (adjust_time_sec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1433         u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1434     else
1435         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1436 
1437     u->real_adjust_time = u->adjust_time;
1438 
1439     pa_source_output_new_data_init(&source_output_data);
1440     source_output_data.driver = __FILE__;
1441     source_output_data.module = m;
1442     if (source)
1443         pa_source_output_new_data_set_source(&source_output_data, source, false, true);
1444 
1445     if (pa_modargs_get_proplist(ma, "source_output_properties", source_output_data.proplist, PA_UPDATE_REPLACE) < 0) {
1446         pa_log("Failed to parse the source_output_properties value.");
1447         pa_source_output_new_data_done(&source_output_data);
1448         goto fail;
1449     }
1450 
1451     if (!pa_proplist_contains(source_output_data.proplist, PA_PROP_MEDIA_ROLE))
1452         pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1453 
1454     pa_source_output_new_data_set_sample_spec(&source_output_data, &ss);
1455     pa_source_output_new_data_set_channel_map(&source_output_data, &map);
1456     source_output_data.flags = PA_SOURCE_OUTPUT_START_CORKED;
1457 
1458     if (!remix)
1459         source_output_data.flags |= PA_SOURCE_OUTPUT_NO_REMIX;
1460 
1461     if (!format_set)
1462         source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_FORMAT;
1463 
1464     if (!rate_set)
1465         source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_RATE;
1466 
1467     if (!channels_set)
1468         source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_CHANNELS;
1469 
1470     source_dont_move = false;
1471     if (pa_modargs_get_value_boolean(ma, "source_dont_move", &source_dont_move) < 0) {
1472         pa_log("source_dont_move= expects a boolean argument.");
1473         goto fail;
1474     }
1475 
1476     if (source_dont_move)
1477         source_output_data.flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
1478 
1479     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1480     pa_source_output_new_data_done(&source_output_data);
1481 
1482     if (!u->source_output)
1483         goto fail;
1484 
1485     u->source_output->parent.process_msg = source_output_process_msg_cb;
1486     u->source_output->push = source_output_push_cb;
1487     u->source_output->process_rewind = source_output_process_rewind_cb;
1488     u->source_output->kill = source_output_kill_cb;
1489     u->source_output->attach = source_output_attach_cb;
1490     u->source_output->detach = source_output_detach_cb;
1491     u->source_output->may_move_to = source_output_may_move_to_cb;
1492     u->source_output->moving = source_output_moving_cb;
1493     u->source_output->suspend = source_output_suspend_cb;
1494     u->source_output->update_source_latency_range = update_source_latency_range_cb;
1495     u->source_output->update_source_fixed_latency = update_source_latency_range_cb;
1496     u->source_output->userdata = u;
1497 
1498     /* If format, rate or channels were originally unset, they are set now
1499      * after the pa_source_output_new() call. */
1500     ss = u->source_output->sample_spec;
1501     map = u->source_output->channel_map;
1502 
1503     pa_sink_input_new_data_init(&sink_input_data);
1504     sink_input_data.driver = __FILE__;
1505     sink_input_data.module = m;
1506 
1507     if (sink)
1508         pa_sink_input_new_data_set_sink(&sink_input_data, sink, false, true);
1509 
1510     if (pa_modargs_get_proplist(ma, "sink_input_properties", sink_input_data.proplist, PA_UPDATE_REPLACE) < 0) {
1511         pa_log("Failed to parse the sink_input_properties value.");
1512         pa_sink_input_new_data_done(&sink_input_data);
1513         goto fail;
1514     }
1515 
1516     if (!pa_proplist_contains(sink_input_data.proplist, PA_PROP_MEDIA_ROLE))
1517         pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1518 
1519     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
1520     pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
1521     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED;
1522 
1523     if (!remix)
1524         sink_input_data.flags |= PA_SINK_INPUT_NO_REMIX;
1525 
1526     sink_dont_move = false;
1527     if (pa_modargs_get_value_boolean(ma, "sink_dont_move", &sink_dont_move) < 0) {
1528         pa_log("sink_dont_move= expects a boolean argument.");
1529         goto fail;
1530     }
1531 
1532     if (sink_dont_move)
1533         sink_input_data.flags |= PA_SINK_INPUT_DONT_MOVE;
1534 
1535     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1536     pa_sink_input_new_data_done(&sink_input_data);
1537 
1538     if (!u->sink_input)
1539         goto fail;
1540 
1541     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1542     u->sink_input->pop = sink_input_pop_cb;
1543     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1544     u->sink_input->kill = sink_input_kill_cb;
1545     u->sink_input->state_change = sink_input_state_change_cb;
1546     u->sink_input->attach = sink_input_attach_cb;
1547     u->sink_input->detach = sink_input_detach_cb;
1548     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1549     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1550     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1551     u->sink_input->moving = sink_input_moving_cb;
1552     u->sink_input->suspend = sink_input_suspend_cb;
1553     u->sink_input->update_sink_latency_range = update_sink_latency_range_cb;
1554     u->sink_input->update_sink_fixed_latency = update_sink_latency_range_cb;
1555     u->sink_input->userdata = u;
1556 
1557     update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1558     set_sink_input_latency(u, u->sink_input->sink);
1559     set_source_output_latency(u, u->source_output->source);
1560 
1561     pa_sink_input_get_silence(u->sink_input, &silence);
1562     u->memblockq = pa_memblockq_new(
1563             "module-loopback memblockq",
1564             0,                      /* idx */
1565             MEMBLOCKQ_MAXLENGTH,    /* maxlength */
1566             MEMBLOCKQ_MAXLENGTH,    /* tlength */
1567             &ss,                    /* sample_spec */
1568             0,                      /* prebuf */
1569             0,                      /* minreq */
1570             0,                      /* maxrewind */
1571             &silence);              /* silence frame */
1572     pa_memblock_unref(silence.memblock);
1573     /* Fill the memblockq with silence */
1574     pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true);
1575 
1576     u->asyncmsgq = pa_asyncmsgq_new(0);
1577     if (!u->asyncmsgq) {
1578         pa_log("pa_asyncmsgq_new() failed.");
1579         goto fail;
1580     }
1581 
1582     if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_NAME))
1583         pa_proplist_setf(u->source_output->proplist, PA_PROP_MEDIA_NAME, "Loopback to %s",
1584                          pa_strnull(pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1585 
1586     if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME)
1587             && (n = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_ICON_NAME)))
1588         pa_proplist_sets(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1589 
1590     if (!pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_NAME))
1591         pa_proplist_setf(u->sink_input->proplist, PA_PROP_MEDIA_NAME, "Loopback from %s",
1592                          pa_strnull(pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1593 
1594     if (source && !pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME)
1595             && (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME)))
1596         pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1597 
1598     /* Hooks to track changes of latency offsets */
1599     pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SINK_PORT_LATENCY_OFFSET_CHANGED],
1600                            PA_HOOK_NORMAL, (pa_hook_cb_t) sink_port_latency_offset_changed_cb, u);
1601     pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SOURCE_PORT_LATENCY_OFFSET_CHANGED],
1602                            PA_HOOK_NORMAL, (pa_hook_cb_t) source_port_latency_offset_changed_cb, u);
1603 
1604     /* Setup message handler for main thread */
1605     u->msg = pa_msgobject_new(loopback_msg);
1606     u->msg->parent.process_msg = loopback_process_msg_cb;
1607     u->msg->userdata = u;
1608 
1609     /* The output thread is not yet running, set effective_source_latency directly */
1610     update_effective_source_latency(u, u->source_output->source, NULL);
1611 
1612     pa_sink_input_put(u->sink_input);
1613     pa_source_output_put(u->source_output);
1614 
1615     if (u->source_output->source->state != PA_SOURCE_SUSPENDED)
1616         pa_sink_input_cork(u->sink_input, false);
1617 
1618     if (u->sink_input->sink->state != PA_SINK_SUSPENDED)
1619         pa_source_output_cork(u->source_output, false);
1620 
1621     update_adjust_timer(u);
1622 
1623     pa_modargs_free(ma);
1624     return 0;
1625 
1626 fail:
1627     if (ma)
1628         pa_modargs_free(ma);
1629 
1630     pa__done(m);
1631 
1632     return -1;
1633 }
1634 
pa__done(pa_module * m)1635 void pa__done(pa_module*m) {
1636     struct userdata *u;
1637 
1638     pa_assert(m);
1639 
1640     if (!(u = m->userdata))
1641         return;
1642 
1643     teardown(u);
1644 
1645     if (u->memblockq)
1646         pa_memblockq_free(u->memblockq);
1647 
1648     if (u->asyncmsgq)
1649         pa_asyncmsgq_unref(u->asyncmsgq);
1650 
1651     pa_xfree(u);
1652 }
1653