• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2     This file is part of PulseAudio.
3 
4     Copyright 2009 Intel Corporation
5     Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
6 
7     PulseAudio is free software; you can redistribute it and/or modify
8     it under the terms of the GNU Lesser General Public License as published
9     by the Free Software Foundation; either version 2.1 of the License,
10     or (at your option) any later version.
11 
12     PulseAudio is distributed in the hope that it will be useful, but
13     WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15     General Public License for more details.
16 
17     You should have received a copy of the GNU Lesser General Public License
18     along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 
27 #include <pulse/xmalloc.h>
28 
29 #include <pulsecore/sink-input.h>
30 #include <pulsecore/module.h>
31 #include <pulsecore/modargs.h>
32 #include <pulsecore/namereg.h>
33 #include <pulsecore/log.h>
34 #include <pulsecore/core-util.h>
35 
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 
39 PA_MODULE_AUTHOR("Pierre-Louis Bossart, Georg Chini");
40 PA_MODULE_DESCRIPTION("Loopback from source to sink");
41 PA_MODULE_VERSION(PACKAGE_VERSION);
42 PA_MODULE_LOAD_ONCE(false);
43 PA_MODULE_USAGE(
44         "source=<source to connect to> "
45         "sink=<sink to connect to> "
46         "adjust_time=<how often to readjust rates in s> "
47         "latency_msec=<latency in ms> "
48         "max_latency_msec=<maximum latency in ms> "
49         "log_interval=<how often to log in s> "
50         "fast_adjust_threshold_msec=<threshold for fast adjust in ms> "
51         "adjust_threshold_usec=<threshold for latency adjustment in usec> "
52         "format=<sample format> "
53         "rate=<sample rate> "
54         "channels=<number of channels> "
55         "channel_map=<channel map> "
56         "sink_input_properties=<proplist> "
57         "source_output_properties=<proplist> "
58         "source_dont_move=<boolean> "
59         "sink_dont_move=<boolean> "
60         "remix=<remix channels?> ");
61 
62 #define DEFAULT_LATENCY_MSEC 200
63 
64 #define FILTER_PARAMETER 0.125
65 
66 #define DEFAULT_ADJUST_THRESHOLD_USEC 250
67 
68 #define MEMBLOCKQ_MAXLENGTH (1024*1024*32)
69 
70 #define MIN_DEVICE_LATENCY (2.5*PA_USEC_PER_MSEC)
71 
72 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
73 
74 typedef struct loopback_msg loopback_msg;
75 
76 struct userdata {
77     pa_core *core;
78     pa_module *module;
79 
80     loopback_msg *msg;
81 
82     pa_sink_input *sink_input;
83     pa_source_output *source_output;
84 
85     pa_asyncmsgq *asyncmsgq;
86     pa_memblockq *memblockq;
87 
88     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
89 
90     pa_time_event *time_event;
91 
92     /* Variables used to calculate the average time between
93      * subsequent calls of adjust_rates() */
94     pa_usec_t adjust_time_stamp;
95     pa_usec_t real_adjust_time;
96     pa_usec_t real_adjust_time_sum;
97 
98     /* Values from command line configuration */
99     pa_usec_t latency;
100     pa_usec_t max_latency;
101     pa_usec_t adjust_time;
102     pa_usec_t fast_adjust_threshold;
103     uint32_t adjust_threshold;
104     uint32_t log_interval;
105 
106     /* Latency boundaries and current values */
107     pa_usec_t min_source_latency;
108     pa_usec_t max_source_latency;
109     pa_usec_t min_sink_latency;
110     pa_usec_t max_sink_latency;
111     pa_usec_t configured_sink_latency;
112     pa_usec_t configured_source_latency;
113     int64_t source_latency_offset;
114     int64_t sink_latency_offset;
115     pa_usec_t minimum_latency;
116 
117     /* State variable of the latency controller */
118     int32_t last_latency_difference;
119     int64_t last_source_latency_offset;
120     int64_t last_sink_latency_offset;
121     int64_t next_latency_with_drift;
122     int64_t next_latency_at_optimum_rate_with_drift;
123 
124     /* Filter varables used for 2nd order filter */
125     double drift_filter;
126     double drift_compensation_rate;
127 
128     /* Variables for Kalman filter and error tracking*/
129     double latency_variance;
130     double kalman_variance;
131     double latency_error;
132 
133     /* lower latency limit found by underruns */
134     pa_usec_t underrun_latency_limit;
135 
136     /* Various counters */
137     uint32_t iteration_counter;
138     uint32_t underrun_counter;
139     uint32_t adjust_counter;
140     uint32_t target_latency_cross_counter;
141     uint32_t log_counter;
142 
143     /* Various booleans */
144     bool fixed_alsa_source;
145     bool source_sink_changed;
146     bool underrun_occured;
147     bool source_latency_offset_changed;
148     bool sink_latency_offset_changed;
149     bool initial_adjust_pending;
150 
151     /* Used for sink input and source output snapshots */
152     struct {
153         int64_t send_counter;
154         int64_t source_latency;
155         pa_usec_t source_timestamp;
156 
157         int64_t recv_counter;
158         size_t loopback_memblockq_length;
159         int64_t sink_latency;
160         pa_usec_t sink_timestamp;
161     } latency_snapshot;
162 
163     /* Input thread variable */
164     int64_t send_counter;
165 
166     /* Output thread variables */
167     struct {
168         int64_t recv_counter;
169         pa_usec_t effective_source_latency;
170 
171         /* Copied from main thread */
172         pa_usec_t minimum_latency;
173 
174         /* Various booleans */
175         bool in_pop;
176         bool pop_called;
177         bool pop_adjust;
178         bool first_pop_done;
179         bool push_called;
180     } output_thread_info;
181 };
182 
183 struct loopback_msg {
184     pa_msgobject parent;
185     struct userdata *userdata;
186     bool dead;
187 };
188 
189 PA_DEFINE_PRIVATE_CLASS(loopback_msg, pa_msgobject);
190 #define LOOPBACK_MSG(o) (loopback_msg_cast(o))
191 
192 static const char* const valid_modargs[] = {
193     "source",
194     "sink",
195     "adjust_time",
196     "latency_msec",
197     "max_latency_msec",
198     "log_interval",
199     "fast_adjust_threshold_msec",
200     "adjust_threshold_usec",
201     "format",
202     "rate",
203     "channels",
204     "channel_map",
205     "sink_input_properties",
206     "source_output_properties",
207     "source_dont_move",
208     "sink_dont_move",
209     "remix",
210     NULL,
211 };
212 
213 enum {
214     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
215     SINK_INPUT_MESSAGE_REWIND,
216     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
217     SINK_INPUT_MESSAGE_SOURCE_CHANGED,
218     SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY,
219     SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
220     SINK_INPUT_MESSAGE_FAST_ADJUST,
221 };
222 
223 enum {
224     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX,
225 };
226 
227 enum {
228     LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED,
229     LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED,
230     LOOPBACK_MESSAGE_UNDERRUN,
231     LOOPBACK_MESSAGE_ADJUST_DONE,
232 };
233 
234 static void enable_adjust_timer(struct userdata *u, bool enable);
235 
236 /* Called from main context */
teardown(struct userdata * u)237 static void teardown(struct userdata *u) {
238     pa_assert(u);
239     pa_assert_ctl_context();
240 
241     u->adjust_time = 0;
242     enable_adjust_timer(u, false);
243 
244     if (u->msg)
245         u->msg->dead = true;
246 
247     /* Handling the asyncmsgq between the source output and the sink input
248      * requires some care. When the source output is unlinked, nothing needs
249      * to be done for the asyncmsgq, because the source output is the sending
250      * end. But when the sink input is unlinked, we should ensure that the
251      * asyncmsgq is emptied, because the messages in the queue hold references
252      * to the sink input. Also, we need to ensure that new messages won't be
253      * written to the queue after we have emptied it.
254      *
255      * Emptying the queue can be done in the state_change() callback of the
256      * sink input, when the new state is "unlinked".
257      *
258      * Preventing new messages from being written to the queue can be achieved
259      * by unlinking the source output before unlinking the sink input. There
260      * are no other writers for that queue, so this is sufficient. */
261 
262     if (u->source_output) {
263         pa_source_output_unlink(u->source_output);
264         pa_source_output_unref(u->source_output);
265         u->source_output = NULL;
266     }
267 
268     if (u->sink_input) {
269         pa_sink_input_unlink(u->sink_input);
270         pa_sink_input_unref(u->sink_input);
271         u->sink_input = NULL;
272     }
273 }
274 
275 /* rate controller, called from main context
276  * - maximum deviation from optimum rate for P-controller is less than 1%
277  * - P-controller step size is limited to 2.01‰
278  * - will calculate an optimum rate
279 */
rate_controller(struct userdata * u,uint32_t base_rate,uint32_t old_rate,int32_t latency_difference_at_optimum_rate,int32_t latency_difference_at_base_rate)280 static uint32_t rate_controller(
281                 struct userdata *u,
282                 uint32_t base_rate, uint32_t old_rate,
283                 int32_t latency_difference_at_optimum_rate,
284                 int32_t latency_difference_at_base_rate) {
285 
286     double new_rate, new_rate_1, new_rate_2;
287     double min_cycles_1, min_cycles_2, drift_rate, latency_drift, controller_weight, min_weight;
288     uint32_t base_rate_with_drift;
289 
290     base_rate_with_drift = (int)(base_rate + u->drift_compensation_rate);
291 
292     /* If we are less than 2‰ away from the optimum rate, lower weight of the
293      * P-controller. The weight is determined by the fact that a correction
294      * of 0.5 Hz needs to be applied by the controller when the latency
295      * difference gets larger than the threshold. The weight follows
296      * from the definition of the controller. The minimum will only
297      * be reached when one adjust threshold away from the target. Start
298      * using the weight after the target latency has been reached for the
299      * second time to accelerate initial convergence. The second time has
300      * been chosen because it takes a while before the smoother returns
301      * reliable latencies. */
302     controller_weight = 1;
303     min_weight = PA_CLAMP(0.5 / (double)base_rate * (100.0 + (double)u->real_adjust_time / u->adjust_threshold), 0, 1.0);
304     if ((double)abs((int)(old_rate - base_rate_with_drift)) / base_rate_with_drift < 0.002 && u->target_latency_cross_counter >= 2)
305         controller_weight = PA_CLAMP((double)abs(latency_difference_at_optimum_rate) / u->adjust_threshold * min_weight, min_weight, 1.0);
306 
307     /* Calculate next rate that is not more than 2‰ away from the last rate */
308     min_cycles_1 = (double)abs(latency_difference_at_optimum_rate) / u->real_adjust_time / 0.002 + 1;
309     new_rate_1 = old_rate + base_rate * (double)latency_difference_at_optimum_rate / min_cycles_1 / u->real_adjust_time;
310 
311     /* Calculate best rate to correct the current latency offset, limit at
312      * 1% difference from base_rate */
313     min_cycles_2 = (double)abs(latency_difference_at_optimum_rate) / u->real_adjust_time / 0.01 + 1;
314     new_rate_2 = (double)base_rate * (1.0 + controller_weight * latency_difference_at_optimum_rate / min_cycles_2 / u->real_adjust_time);
315 
316     /* Choose the rate that is nearer to base_rate unless we are already near
317      * to the desired latency and rate */
318     if (abs((int)(new_rate_1 - base_rate)) < abs((int)(new_rate_2 - base_rate)) && controller_weight > 0.99)
319         new_rate = new_rate_1;
320     else
321         new_rate = new_rate_2;
322 
323     /* Calculate rate difference between source and sink. Skip calculation
324      * after a source/sink change, an underrun or latency offset change */
325 
326     if (!u->underrun_occured && !u->source_sink_changed && !u->source_latency_offset_changed && !u->sink_latency_offset_changed) {
327         /* Latency difference between last iterations */
328         latency_drift = latency_difference_at_base_rate - u->last_latency_difference;
329 
330         /* Calculate frequency difference between source and sink */
331         drift_rate = latency_drift * old_rate / u->real_adjust_time + old_rate - base_rate;
332 
333         /* The maximum accepted sample rate difference between source and
334          * sink is 1% of the base rate. If the result is larger, something
335          * went wrong, so do not use it. Pass in 0 instead to allow the
336          * filter to decay. */
337         if (abs((int)drift_rate) > base_rate / 100)
338             drift_rate = 0;
339 
340         /* 2nd order lowpass filter */
341         u->drift_filter = (1 - FILTER_PARAMETER) * u->drift_filter + FILTER_PARAMETER * drift_rate;
342         u->drift_compensation_rate =  (1 - FILTER_PARAMETER) * u->drift_compensation_rate + FILTER_PARAMETER * u->drift_filter;
343     }
344 
345     /* Use drift compensation. Though not likely, the rate might exceed the maximum allowed rate now. */
346     new_rate = new_rate + u->drift_compensation_rate + 0.5;
347 
348     if (new_rate > base_rate * 101 / 100)
349         return base_rate * 101 / 100;
350     else if (new_rate < base_rate * 99 / 100)
351         return base_rate * 99 / 100;
352     else
353         return (int)new_rate;
354 }
355 
356 /* Called from main thread.
357  * It has been a matter of discussion how to correctly calculate the minimum
358  * latency that module-loopback can deliver with a given source and sink.
359  * The calculation has been placed in a separate function so that the definition
360  * can easily be changed. The resulting estimate is not very exact because it
361  * depends on the reported latency ranges. In cases were the lower bounds of
362  * source and sink latency are not reported correctly (USB) the result will
363  * be wrong. */
update_minimum_latency(struct userdata * u,pa_sink * sink,bool print_msg)364 static void update_minimum_latency(struct userdata *u, pa_sink *sink, bool print_msg) {
365 
366     if (u->underrun_latency_limit)
367         /* If we already detected a real latency limit because of underruns, use it */
368         u->minimum_latency = u->underrun_latency_limit;
369 
370     else {
371         /* Calculate latency limit from latency ranges */
372 
373         u->minimum_latency = u->min_sink_latency;
374         if (u->fixed_alsa_source)
375             /* If we are using an alsa source with fixed latency, we will get a wakeup when
376              * one fragment is filled, and then we empty the source buffer, so the source
377              * latency never grows much beyond one fragment (assuming that the CPU doesn't
378              * cause a bottleneck). */
379             u->minimum_latency += u->core->default_fragment_size_msec * PA_USEC_PER_MSEC;
380 
381         else
382             /* In all other cases the source will deliver new data at latest after one source latency.
383              * Make sure there is enough data available that the sink can keep on playing until new
384              * data is pushed. */
385             u->minimum_latency += u->min_source_latency;
386 
387         /* Multiply by 1.1 as a safety margin for delays that are proportional to the buffer sizes */
388         u->minimum_latency *= 1.1;
389 
390         /* Add 1.5 ms as a safety margin for delays not related to the buffer sizes */
391         u->minimum_latency += 1.5 * PA_USEC_PER_MSEC;
392     }
393 
394     /* Add the latency offsets */
395     if (-(u->sink_latency_offset + u->source_latency_offset) <= (int64_t)u->minimum_latency)
396         u->minimum_latency += u->sink_latency_offset + u->source_latency_offset;
397     else
398         u->minimum_latency = 0;
399 
400     /* If the sink is valid, send a message to update the minimum latency to
401      * the output thread, else set the variable directly */
402     if (sink)
403         pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY, NULL, u->minimum_latency, NULL);
404     else
405         u->output_thread_info.minimum_latency = u->minimum_latency;
406 
407     if (print_msg) {
408         pa_log_info("Minimum possible end to end latency: %0.2f ms", (double)u->minimum_latency / PA_USEC_PER_MSEC);
409         if (u->latency < u->minimum_latency)
410             pa_log_warn("Configured latency of %0.2f ms is smaller than minimum latency, using minimum instead", (double)u->latency / PA_USEC_PER_MSEC);
411     }
412 }
413 
414 /* Called from main context */
adjust_rates(struct userdata * u)415 static void adjust_rates(struct userdata *u) {
416     size_t buffer;
417     uint32_t old_rate, base_rate, new_rate, run_hours;
418     int32_t latency_difference;
419     pa_usec_t current_buffer_latency, snapshot_delay;
420     int64_t current_source_sink_latency, current_latency, latency_at_optimum_rate;
421     pa_usec_t final_latency, now, time_passed;
422     double filtered_latency, current_latency_error, latency_correction, base_rate_with_drift;
423 
424     pa_assert(u);
425     pa_assert_ctl_context();
426 
427     /* Runtime and counters since last change of source or sink
428      * or source/sink latency */
429     run_hours = u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600;
430     u->iteration_counter +=1;
431 
432     /* If we are seeing underruns then the latency is too small */
433     if (u->underrun_counter > 2) {
434         pa_usec_t target_latency;
435 
436         target_latency = PA_MAX(u->latency, u->minimum_latency) + 5 * PA_USEC_PER_MSEC;
437 
438         if (u->max_latency == 0 || target_latency < u->max_latency) {
439             u->underrun_latency_limit = PA_CLIP_SUB((int64_t)target_latency, u->sink_latency_offset + u->source_latency_offset);
440             pa_log_warn("Too many underruns, increasing latency to %0.2f ms", (double)target_latency / PA_USEC_PER_MSEC);
441         } else {
442             u->underrun_latency_limit = PA_CLIP_SUB((int64_t)u->max_latency, u->sink_latency_offset + u->source_latency_offset);
443             pa_log_warn("Too many underruns, configured maximum latency of %0.2f ms is reached", (double)u->max_latency / PA_USEC_PER_MSEC);
444             pa_log_warn("Consider increasing the max_latency_msec");
445         }
446 
447         update_minimum_latency(u, u->sink_input->sink, false);
448         u->underrun_counter = 0;
449     }
450 
451     /* Allow one underrun per hour */
452     if (u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600 > run_hours) {
453         u->underrun_counter = PA_CLIP_SUB(u->underrun_counter, 1u);
454         pa_log_info("Underrun counter: %u", u->underrun_counter);
455     }
456 
457     /* Calculate real adjust time if source or sink did not change and if the system has
458      * not been suspended. If the time between two calls is more than 5% longer than the
459      * configured adjust time, we assume that the system has been sleeping and skip the
460      * calculation for this iteration. When source or sink changed or the system has been
461      * sleeping, we need to reset the parameters for drift compensation. */
462     now = pa_rtclock_now();
463     time_passed = now - u->adjust_time_stamp;
464     if (!u->source_sink_changed && time_passed < u->adjust_time * 1.05) {
465         u->adjust_counter++;
466         u->real_adjust_time_sum += time_passed;
467         u->real_adjust_time = u->real_adjust_time_sum / u->adjust_counter;
468     } else {
469         u->drift_compensation_rate = 0;
470         u->drift_filter = 0;
471         /* Ensure that source_sink_changed is set, so that the Kalman filter parameters
472          * will also be reset. */
473         u->source_sink_changed = true;
474     }
475     u->adjust_time_stamp = now;
476 
477     /* Rates and latencies */
478     old_rate = u->sink_input->sample_spec.rate;
479     base_rate = u->source_output->sample_spec.rate;
480 
481     buffer = u->latency_snapshot.loopback_memblockq_length;
482     if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter)
483         buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
484     else
485         buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
486 
487     current_buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
488     snapshot_delay = u->latency_snapshot.source_timestamp - u->latency_snapshot.sink_timestamp;
489     current_source_sink_latency = u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency - snapshot_delay;
490 
491     /* Current latency */
492     current_latency = current_source_sink_latency + current_buffer_latency;
493 
494     /* Latency at optimum rate and latency difference */
495     latency_at_optimum_rate = current_source_sink_latency + current_buffer_latency * old_rate / (u->drift_compensation_rate + base_rate);
496 
497     final_latency = PA_MAX(u->latency, u->minimum_latency);
498     latency_difference = (int32_t)(current_latency - final_latency);
499 
500     /* Do not filter or calculate error if source or sink changed or if there was an underrun */
501     if (u->source_sink_changed || u->underrun_occured) {
502         /* Initial conditions are very unsure, so use a high variance */
503         u->kalman_variance = 10000000;
504         filtered_latency = latency_at_optimum_rate;
505         u->next_latency_at_optimum_rate_with_drift = latency_at_optimum_rate;
506         u->next_latency_with_drift = current_latency;
507 
508     } else {
509         /* Correct predictions if one of the latency offsets changed between iterations */
510         u->next_latency_at_optimum_rate_with_drift += u->source_latency_offset - u->last_source_latency_offset;
511         u->next_latency_at_optimum_rate_with_drift += u->sink_latency_offset - u->last_sink_latency_offset;
512         u->next_latency_with_drift += u->source_latency_offset - u->last_source_latency_offset;
513         u->next_latency_with_drift += u->sink_latency_offset - u->last_sink_latency_offset;
514         /* Low pass filtered latency error. This value reflects how well the measured values match the prediction. */
515         u->latency_error = (1 - FILTER_PARAMETER) * u->latency_error + FILTER_PARAMETER * (double)abs((int32_t)(current_latency - u->next_latency_with_drift));
516         /* Low pass filtered latency variance */
517         current_latency_error = (double)abs((int32_t)(latency_at_optimum_rate - u->next_latency_at_optimum_rate_with_drift));
518         u->latency_variance = (1.0 - FILTER_PARAMETER) * u->latency_variance + FILTER_PARAMETER * current_latency_error * current_latency_error;
519         /* Kalman filter */
520         filtered_latency = (latency_at_optimum_rate * u->kalman_variance + u->next_latency_at_optimum_rate_with_drift * u->latency_variance) / (u->kalman_variance + u->latency_variance);
521         u->kalman_variance = u->kalman_variance * u->latency_variance / (u->kalman_variance + u->latency_variance) + u->latency_variance / 4 + 200;
522     }
523 
524     /* Drop or insert samples if fast_adjust_threshold_msec was specified and the latency difference is too large. */
525     if (u->fast_adjust_threshold > 0 && abs(latency_difference) > u->fast_adjust_threshold) {
526         pa_log_debug ("Latency difference larger than %" PRIu64 " msec, skipping or inserting samples.", u->fast_adjust_threshold / PA_USEC_PER_MSEC);
527 
528         pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_FAST_ADJUST, NULL, current_source_sink_latency, NULL);
529 
530         /* Skip real adjust time calculation and reset drift compensation parameters on next iteration. */
531         u->source_sink_changed = true;
532 
533         /* We probably need to adjust again, reset cross_counter. */
534         u->target_latency_cross_counter = 0;
535         return;
536     }
537 
538     /* Calculate new rate */
539     new_rate = rate_controller(u, base_rate, old_rate, (int32_t)(filtered_latency - final_latency), latency_difference);
540 
541     /* Log every log_interval iterations if the log_interval parameter is set */
542     if (u->log_interval != 0) {
543         u->log_counter--;
544         if (u->log_counter == 0) {
545             pa_log_debug("Loopback status %s to %s:\n    Source latency: %0.2f ms\n    Buffer: %0.2f ms\n    Sink latency: %0.2f ms\n    End-to-end latency: %0.2f ms\n"
546                          "    Deviation from target latency at optimum rate: %0.2f usec\n    Average prediction error: ± %0.2f usec\n    Optimum rate: %0.2f Hz\n    Deviation from base rate: %i Hz",
547                         u->source_output->source->name,
548                         u->sink_input->sink->name,
549                         (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
550                         (double) current_buffer_latency / PA_USEC_PER_MSEC,
551                         (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
552                         (double) current_latency / PA_USEC_PER_MSEC,
553                         (double) latency_at_optimum_rate - final_latency,
554                         (double) u->latency_error,
555                         u->drift_compensation_rate + base_rate,
556                         (int32_t)(new_rate - base_rate));
557             u->log_counter = u->log_interval;
558         }
559     }
560 
561     /* If the latency difference changed sign, we have crossed the target latency. */
562     if ((int64_t)latency_difference * u->last_latency_difference < 0)
563         u->target_latency_cross_counter++;
564 
565     /* Save current latency difference at new rate for next cycle and reset flags */
566     u->last_latency_difference = current_source_sink_latency + current_buffer_latency * old_rate / new_rate - final_latency;
567 
568     /* Set variables that may change between calls of adjust_rate() */
569     u->source_sink_changed = false;
570     u->underrun_occured = false;
571     u->last_source_latency_offset = u->source_latency_offset;
572     u->last_sink_latency_offset = u->sink_latency_offset;
573     u->source_latency_offset_changed = false;
574     u->sink_latency_offset_changed = false;
575 
576     /* Predicton of next latency */
577 
578     /* Evaluate optimum rate */
579     base_rate_with_drift = u->drift_compensation_rate + base_rate;
580 
581     /* Latency correction on next iteration */
582     latency_correction = (base_rate_with_drift - new_rate) * (int64_t)u->real_adjust_time / new_rate;
583 
584     if ((int)new_rate != (int)base_rate_with_drift || new_rate != old_rate) {
585         /* While we are correcting, the next latency is determined by the current value and the difference
586          * between the new sampling rate and the base rate*/
587         u->next_latency_with_drift = current_latency + latency_correction + ((double)old_rate / new_rate - 1) * current_buffer_latency;
588         u->next_latency_at_optimum_rate_with_drift = filtered_latency + latency_correction * new_rate / base_rate_with_drift;
589 
590     } else {
591         /* We are in steady state, now only the fractional drift should matter.
592          * To make sure that we do not drift away due to errors in the fractional
593          * drift, use a running average of the measured and predicted values */
594         u->next_latency_with_drift = (filtered_latency + u->next_latency_with_drift) / 2.0 + (1.0 - (double)(int)base_rate_with_drift / base_rate_with_drift) * (int64_t)u->real_adjust_time;
595 
596         /* We are at the optimum rate, so nothing to correct */
597         u->next_latency_at_optimum_rate_with_drift = u->next_latency_with_drift;
598     }
599 
600     /* Set rate */
601     pa_sink_input_set_rate(u->sink_input, new_rate);
602 }
603 
604 /* Called from main context */
time_callback(pa_mainloop_api * a,pa_time_event * e,const struct timeval * t,void * userdata)605 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
606     struct userdata *u = userdata;
607 
608     pa_assert(u);
609     pa_assert(a);
610     pa_assert(u->time_event == e);
611 
612     /* Restart timer right away */
613     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
614 
615     /* If the initial latency adjustment has not been done yet, we have to skip
616      * adjust_rates(). The estimation of the optimum rate cannot be done in that
617      * situation */
618     if (u->initial_adjust_pending)
619         return;
620 
621     /* Get sink and source latency snapshot */
622     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
623     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
624 
625     adjust_rates(u);
626 }
627 
628 /* Called from main context
629  * When source or sink changes, give it a third of a second to settle down, then call adjust_rates for the first time */
enable_adjust_timer(struct userdata * u,bool enable)630 static void enable_adjust_timer(struct userdata *u, bool enable) {
631     if (enable) {
632         if (!u->adjust_time)
633             return;
634         if (u->time_event)
635             u->core->mainloop->time_free(u->time_event);
636 
637         u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + 333 * PA_USEC_PER_MSEC, time_callback, u);
638     } else {
639         if (!u->time_event)
640             return;
641 
642         u->core->mainloop->time_free(u->time_event);
643         u->time_event = NULL;
644     }
645 }
646 
647 /* Called from main context */
update_adjust_timer(struct userdata * u)648 static void update_adjust_timer(struct userdata *u) {
649     if (u->sink_input->state == PA_SINK_INPUT_CORKED || u->source_output->state == PA_SOURCE_OUTPUT_CORKED)
650         enable_adjust_timer(u, false);
651     else
652         enable_adjust_timer(u, true);
653 }
654 
655 /* Called from main thread
656  * Calculates minimum and maximum possible latency for source and sink */
update_latency_boundaries(struct userdata * u,pa_source * source,pa_sink * sink)657 static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_sink *sink) {
658     const char *s;
659 
660     if (source) {
661         /* Source latencies */
662         u->fixed_alsa_source = false;
663         if (source->flags & PA_SOURCE_DYNAMIC_LATENCY)
664             pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency);
665         else {
666             u->min_source_latency = pa_source_get_fixed_latency(source);
667             u->max_source_latency = u->min_source_latency;
668             if ((s = pa_proplist_gets(source->proplist, PA_PROP_DEVICE_API))) {
669                 if (pa_streq(s, "alsa"))
670                     u->fixed_alsa_source = true;
671             }
672         }
673         /* Source offset */
674         u->source_latency_offset = source->port_latency_offset;
675 
676         /* Latencies below 2.5 ms cause problems, limit source latency if possible */
677         if (u->max_source_latency >= MIN_DEVICE_LATENCY)
678             u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY);
679         else
680             u->min_source_latency = u->max_source_latency;
681     }
682 
683     if (sink) {
684         /* Sink latencies */
685         if (sink->flags & PA_SINK_DYNAMIC_LATENCY)
686             pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency);
687         else {
688             u->min_sink_latency = pa_sink_get_fixed_latency(sink);
689             u->max_sink_latency = u->min_sink_latency;
690         }
691         /* Sink offset */
692         u->sink_latency_offset = sink->port_latency_offset;
693 
694         /* Latencies below 2.5 ms cause problems, limit sink latency if possible */
695         if (u->max_sink_latency >= MIN_DEVICE_LATENCY)
696             u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY);
697         else
698             u->min_sink_latency = u->max_sink_latency;
699     }
700 
701     update_minimum_latency(u, sink, true);
702 }
703 
704 /* Called from output context
705  * Sets the memblockq to the configured latency corrected by latency_offset_usec */
memblockq_adjust(struct userdata * u,int64_t latency_offset_usec,bool allow_push)706 static void memblockq_adjust(struct userdata *u, int64_t latency_offset_usec, bool allow_push) {
707     size_t current_memblockq_length, requested_memblockq_length, buffer_correction;
708     int64_t requested_buffer_latency;
709     pa_usec_t final_latency, requested_sink_latency;
710 
711     final_latency = PA_MAX(u->latency, u->output_thread_info.minimum_latency);
712 
713     /* If source or sink have some large negative latency offset, we might want to
714      * hold more than final_latency in the memblockq */
715     requested_buffer_latency = (int64_t)final_latency - latency_offset_usec;
716 
717     /* Keep at least one sink latency in the queue to make sure that the sink
718      * never underruns initially */
719     requested_sink_latency = pa_sink_get_requested_latency_within_thread(u->sink_input->sink);
720     if (requested_buffer_latency < (int64_t)requested_sink_latency)
721         requested_buffer_latency = requested_sink_latency;
722 
723     requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec);
724     current_memblockq_length = pa_memblockq_get_length(u->memblockq);
725 
726     if (current_memblockq_length > requested_memblockq_length) {
727         /* Drop audio from queue */
728         buffer_correction = current_memblockq_length - requested_memblockq_length;
729         pa_log_info("Dropping %" PRIu64 " usec of audio from queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
730         pa_memblockq_drop(u->memblockq, buffer_correction);
731 
732     } else if (current_memblockq_length < requested_memblockq_length && allow_push) {
733         /* Add silence to queue */
734         buffer_correction = requested_memblockq_length - current_memblockq_length;
735         pa_log_info("Adding %" PRIu64 " usec of silence to queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
736         pa_memblockq_seek(u->memblockq, (int64_t)buffer_correction, PA_SEEK_RELATIVE, true);
737     }
738 }
739 
740 /* Called from input thread context */
source_output_push_cb(pa_source_output * o,const pa_memchunk * chunk)741 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
742     struct userdata *u;
743     pa_usec_t push_time;
744     int64_t current_source_latency;
745 
746     pa_source_output_assert_ref(o);
747     pa_source_output_assert_io_context(o);
748     pa_assert_se(u = o->userdata);
749 
750     /* Send current source latency and timestamp with the message */
751     push_time = pa_rtclock_now();
752     current_source_latency = pa_source_get_latency_within_thread(u->source_output->source, true);
753     current_source_latency += pa_resampler_get_delay_usec(u->source_output->thread_info.resampler);
754 
755     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_INT_TO_PTR(current_source_latency), push_time, chunk, NULL);
756     u->send_counter += (int64_t) chunk->length;
757 }
758 
759 /* Called from input thread context */
source_output_process_rewind_cb(pa_source_output * o,size_t nbytes)760 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
761     struct userdata *u;
762 
763     pa_source_output_assert_ref(o);
764     pa_source_output_assert_io_context(o);
765     pa_assert_se(u = o->userdata);
766 
767     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
768     u->send_counter -= (int64_t) nbytes;
769 }
770 
771 /* Called from input thread context */
source_output_process_msg_cb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)772 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
773     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
774 
775     switch (code) {
776 
777         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
778             size_t length;
779 
780             length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
781 
782             u->latency_snapshot.send_counter = u->send_counter;
783             /* Add content of delay memblockq to the source latency */
784             u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, true) +
785                                                  pa_bytes_to_usec(length, &u->source_output->source->sample_spec);
786             /* Add resampler latency */
787             u->latency_snapshot.source_latency += pa_resampler_get_delay_usec(u->source_output->thread_info.resampler);
788 
789             u->latency_snapshot.source_timestamp = pa_rtclock_now();
790 
791             return 0;
792         }
793     }
794 
795     return pa_source_output_process_msg(obj, code, data, offset, chunk);
796 }
797 
798 /* Called from main thread.
799  * Get current effective latency of the source. If the source is in use with
800  * smaller latency than the configured latency, it will continue running with
801  * the smaller value when the source output is switched to the source. */
update_effective_source_latency(struct userdata * u,pa_source * source,pa_sink * sink)802 static void update_effective_source_latency(struct userdata *u, pa_source *source, pa_sink *sink) {
803     pa_usec_t effective_source_latency;
804 
805     effective_source_latency = u->configured_source_latency;
806 
807     if (source) {
808         effective_source_latency = pa_source_get_requested_latency(source);
809         if (effective_source_latency == 0 || effective_source_latency > u->configured_source_latency)
810             effective_source_latency = u->configured_source_latency;
811     }
812 
813     /* If the sink is valid, send a message to the output thread, else set the variable directly */
814     if (sink)
815         pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effective_source_latency, NULL);
816     else
817        u->output_thread_info.effective_source_latency = effective_source_latency;
818 }
819 
820 /* Called from main thread.
821  * Set source output latency to one third of the overall latency if possible.
822  * The choice of one third is rather arbitrary somewhere between the minimum
823  * possible latency which would cause a lot of CPU load and half the configured
824  * latency which would quickly lead to underruns */
set_source_output_latency(struct userdata * u,pa_source * source)825 static void set_source_output_latency(struct userdata *u, pa_source *source) {
826     pa_usec_t latency, requested_latency;
827 
828     requested_latency = u->latency / 3;
829 
830     /* Normally we try to configure sink and source latency equally. If the
831      * sink latency cannot match the requested source latency try to set the
832      * source latency to a smaller value to avoid underruns */
833     if (u->min_sink_latency > requested_latency) {
834         latency = PA_MAX(u->latency, u->minimum_latency);
835         requested_latency = (latency - u->min_sink_latency) / 2;
836     }
837 
838     latency = PA_CLAMP(requested_latency , u->min_source_latency, u->max_source_latency);
839     u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency);
840     if (u->configured_source_latency != requested_latency)
841         pa_log_warn("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC);
842 }
843 
844 /* Called from input thread context */
source_output_attach_cb(pa_source_output * o)845 static void source_output_attach_cb(pa_source_output *o) {
846     struct userdata *u;
847 
848     pa_source_output_assert_ref(o);
849     pa_source_output_assert_io_context(o);
850     pa_assert_se(u = o->userdata);
851 
852     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
853             o->source->thread_info.rtpoll,
854             PA_RTPOLL_LATE,
855             u->asyncmsgq);
856 }
857 
858 /* Called from input thread context */
source_output_detach_cb(pa_source_output * o)859 static void source_output_detach_cb(pa_source_output *o) {
860     struct userdata *u;
861 
862     pa_source_output_assert_ref(o);
863     pa_source_output_assert_io_context(o);
864     pa_assert_se(u = o->userdata);
865 
866     if (u->rtpoll_item_write) {
867         pa_rtpoll_item_free(u->rtpoll_item_write);
868         u->rtpoll_item_write = NULL;
869     }
870 }
871 
872 /* Called from main thread */
source_output_kill_cb(pa_source_output * o)873 static void source_output_kill_cb(pa_source_output *o) {
874     struct userdata *u;
875 
876     pa_source_output_assert_ref(o);
877     pa_assert_ctl_context();
878     pa_assert_se(u = o->userdata);
879 
880     teardown(u);
881     pa_module_unload_request(u->module, true);
882 }
883 
884 /* Called from main thread */
source_output_may_move_to_cb(pa_source_output * o,pa_source * dest)885 static bool source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
886     struct userdata *u;
887 
888     pa_source_output_assert_ref(o);
889     pa_assert_ctl_context();
890     pa_assert_se(u = o->userdata);
891 
892     if (!u->sink_input || !u->sink_input->sink)
893         return true;
894 
895     return dest != u->sink_input->sink->monitor_source;
896 }
897 
898 /* Called from main thread */
source_output_moving_cb(pa_source_output * o,pa_source * dest)899 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
900     struct userdata *u;
901     char *input_description;
902     const char *n;
903 
904     if (!dest)
905         return;
906 
907     pa_source_output_assert_ref(o);
908     pa_assert_ctl_context();
909     pa_assert_se(u = o->userdata);
910 
911     input_description = pa_sprintf_malloc("Loopback of %s",
912                                           pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
913     pa_sink_input_set_property(u->sink_input, PA_PROP_MEDIA_NAME, input_description);
914     pa_xfree(input_description);
915 
916     if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
917         pa_sink_input_set_property(u->sink_input, PA_PROP_DEVICE_ICON_NAME, n);
918 
919     /* Set latency and calculate latency limits */
920     u->underrun_latency_limit = 0;
921     u->last_source_latency_offset = dest->port_latency_offset;
922     u->initial_adjust_pending = true;
923     update_latency_boundaries(u, dest, u->sink_input->sink);
924     set_source_output_latency(u, dest);
925     update_effective_source_latency(u, dest, u->sink_input->sink);
926 
927     /* Uncork the sink input unless the destination is suspended for other
928      * reasons than idle. */
929     if (dest->state == PA_SOURCE_SUSPENDED)
930         pa_sink_input_cork(u->sink_input, (dest->suspend_cause != PA_SUSPEND_IDLE));
931     else
932         pa_sink_input_cork(u->sink_input, false);
933 
934     update_adjust_timer(u);
935 
936     /* Reset counters */
937     u->iteration_counter = 0;
938     u->underrun_counter = 0;
939 
940     /* Reset booleans, latency error and counters */
941     u->source_sink_changed = true;
942     u->underrun_occured = false;
943     u->source_latency_offset_changed = false;
944     u->target_latency_cross_counter = 0;
945     u->log_counter = u->log_interval;
946     u->latency_error = 0;
947 
948     /* Send a mesage to the output thread that the source has changed.
949      * If the sink is invalid here during a profile switching situation
950      * we can safely set push_called to false directly. */
951     if (u->sink_input->sink)
952         pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
953     else
954         u->output_thread_info.push_called = false;
955 
956     /* The sampling rate may be far away from the default rate if we are still
957      * recovering from a previous source or sink change, so reset rate to
958      * default before moving the source. */
959     pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
960 }
961 
962 /* Called from main thread */
source_output_suspend_cb(pa_source_output * o,pa_source_state_t old_state,pa_suspend_cause_t old_suspend_cause)963 static void source_output_suspend_cb(pa_source_output *o, pa_source_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
964     struct userdata *u;
965     bool suspended;
966 
967     pa_source_output_assert_ref(o);
968     pa_assert_ctl_context();
969     pa_assert_se(u = o->userdata);
970 
971     /* State has not changed, nothing to do */
972     if (old_state == o->source->state)
973         return;
974 
975     suspended = (o->source->state == PA_SOURCE_SUSPENDED);
976 
977     /* If the source has been suspended, we need to handle this like
978      * a source change when the source is resumed */
979     if (suspended) {
980         if (u->sink_input->sink)
981             pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
982         else
983             u->output_thread_info.push_called = false;
984 
985     } else
986         /* Get effective source latency on unsuspend */
987         update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
988 
989     pa_sink_input_cork(u->sink_input, suspended);
990 
991     update_adjust_timer(u);
992 }
993 
994 /* Called from input thread context */
update_source_latency_range_cb(pa_source_output * i)995 static void update_source_latency_range_cb(pa_source_output *i) {
996     struct userdata *u;
997 
998     pa_source_output_assert_ref(i);
999     pa_source_output_assert_io_context(i);
1000     pa_assert_se(u = i->userdata);
1001 
1002     /* Source latency may have changed */
1003     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1004 }
1005 
1006 /* Called from output thread context */
sink_input_pop_cb(pa_sink_input * i,size_t nbytes,pa_memchunk * chunk)1007 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1008     struct userdata *u;
1009 
1010     pa_sink_input_assert_ref(i);
1011     pa_sink_input_assert_io_context(i);
1012     pa_assert_se(u = i->userdata);
1013     pa_assert(chunk);
1014 
1015     /* It seems necessary to handle outstanding push messages here, though it is not clear
1016      * why. Removing this part leads to underruns when low latencies are configured. */
1017     u->output_thread_info.in_pop = true;
1018     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
1019         ;
1020     u->output_thread_info.in_pop = false;
1021 
1022     /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are
1023      * enabled. Disable them on second pop and enable the final adjustment during the
1024      * next push. The adjustment must be done on the next push, because there is no way
1025      * to retrieve the source latency here. We are waiting for the second pop, because
1026      * the first pop may be called before the sink is actually started. */
1027     if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) {
1028         u->output_thread_info.pop_adjust = true;
1029         u->output_thread_info.pop_called = true;
1030     }
1031     u->output_thread_info.first_pop_done = true;
1032 
1033     if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
1034         pa_log_info("Could not peek into queue");
1035         return -1;
1036     }
1037 
1038     chunk->length = PA_MIN(chunk->length, nbytes);
1039     pa_memblockq_drop(u->memblockq, chunk->length);
1040 
1041     /* Adjust the memblockq to ensure that there is
1042      * enough data in the queue to avoid underruns. */
1043     if (!u->output_thread_info.push_called)
1044         memblockq_adjust(u, 0, true);
1045 
1046     return 0;
1047 }
1048 
1049 /* Called from output thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)1050 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1051     struct userdata *u;
1052 
1053     pa_sink_input_assert_ref(i);
1054     pa_sink_input_assert_io_context(i);
1055     pa_assert_se(u = i->userdata);
1056 
1057     pa_memblockq_rewind(u->memblockq, nbytes);
1058 }
1059 
1060 /* Called from output thread context */
sink_input_process_msg_cb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)1061 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1062     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1063 
1064     pa_sink_input_assert_io_context(u->sink_input);
1065 
1066     switch (code) {
1067 
1068         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1069             pa_usec_t *r = data;
1070 
1071             *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
1072 
1073             /* Fall through, the default handler will add in the extra
1074              * latency added by the resampler */
1075             break;
1076         }
1077 
1078         case SINK_INPUT_MESSAGE_POST:
1079 
1080             pa_memblockq_push_align(u->memblockq, chunk);
1081 
1082             /* If push has not been called yet, latency adjustments in sink_input_pop_cb()
1083              * are enabled. Disable them on first push and correct the memblockq. If pop
1084              * has not been called yet, wait until the pop_cb() requests the adjustment */
1085             if (u->output_thread_info.pop_called && (!u->output_thread_info.push_called || u->output_thread_info.pop_adjust)) {
1086                 int64_t time_delta;
1087 
1088                 /* This is the source latency at the time push was called */
1089                 time_delta = PA_PTR_TO_INT(data);
1090                 /* Add the time between push and post */
1091                 time_delta += pa_rtclock_now() - (pa_usec_t) offset;
1092                 /* Add the sink and resampler latency */
1093                 time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink, true);
1094                 time_delta += pa_resampler_get_delay_usec(u->sink_input->thread_info.resampler);
1095 
1096                 /* The source latency report includes the audio in the chunk,
1097                  * but since we already pushed the chunk to the memblockq, we need
1098                  * to subtract the chunk size from the source latency so that it
1099                  * won't be counted towards both the memblockq latency and the
1100                  * source latency.
1101                  *
1102                  * Sometimes the alsa source reports way too low latency (might
1103                  * be a bug in the alsa source code). This seems to happen when
1104                  * there's an overrun. As an attempt to detect overruns, we
1105                  * check if the chunk size is larger than the configured source
1106                  * latency. If so, we assume that the source should have pushed
1107                  * a chunk whose size equals the configured latency, so we
1108                  * modify time_delta only by that amount, which makes
1109                  * memblockq_adjust() drop more data than it would otherwise.
1110                  * This seems to work quite well, but it's possible that the
1111                  * next push also contains too much data, and in that case the
1112                  * resulting latency will be wrong. */
1113                 if (pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency)
1114                     time_delta -= (int64_t)u->output_thread_info.effective_source_latency;
1115                 else
1116                     time_delta -= (int64_t)pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec);
1117 
1118                 /* FIXME: We allow pushing silence here to fix up the latency. This
1119                  * might lead to a gap in the stream */
1120                 memblockq_adjust(u, time_delta, true);
1121 
1122                 /* Notify main thread when the initial adjustment is done. */
1123                 if (u->output_thread_info.pop_called)
1124                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_ADJUST_DONE, NULL, 0, NULL, NULL);
1125 
1126                 u->output_thread_info.pop_adjust = false;
1127                 u->output_thread_info.push_called = true;
1128             }
1129 
1130             /* If pop has not been called yet, make sure the latency does not grow too much.
1131              * Don't push any silence here, because we already have new data in the queue */
1132             if (!u->output_thread_info.pop_called)
1133                  memblockq_adjust(u, 0, false);
1134 
1135             /* Is this the end of an underrun? Then let's start things
1136              * right-away */
1137             if (u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED &&
1138                 u->sink_input->thread_info.underrun_for > 0 &&
1139                 pa_memblockq_is_readable(u->memblockq) &&
1140                 u->output_thread_info.pop_called) {
1141 
1142                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_UNDERRUN, NULL, 0, NULL, NULL);
1143                 /* If called from within the pop callback skip the rewind */
1144                 if (!u->output_thread_info.in_pop) {
1145                     pa_log_debug("Requesting rewind due to end of underrun.");
1146                     pa_sink_input_request_rewind(u->sink_input,
1147                                                  (size_t) (u->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : u->sink_input->thread_info.underrun_for),
1148                                                  false, true, false);
1149                 }
1150             }
1151 
1152             u->output_thread_info.recv_counter += (int64_t) chunk->length;
1153 
1154             return 0;
1155 
1156         case SINK_INPUT_MESSAGE_REWIND:
1157 
1158             /* Do not try to rewind if no data was pushed yet */
1159             if (u->output_thread_info.push_called)
1160                 pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
1161 
1162             u->output_thread_info.recv_counter -= offset;
1163 
1164             return 0;
1165 
1166         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1167             size_t length;
1168 
1169             length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1170 
1171             u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
1172             u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
1173             /* Add content of render memblockq to sink latency */
1174             u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
1175                                                pa_bytes_to_usec(length, &u->sink_input->sink->sample_spec);
1176             /* Add resampler latency */
1177             u->latency_snapshot.sink_latency += pa_resampler_get_delay_usec(u->sink_input->thread_info.resampler);
1178 
1179             u->latency_snapshot.sink_timestamp = pa_rtclock_now();
1180 
1181             return 0;
1182         }
1183 
1184         case SINK_INPUT_MESSAGE_SOURCE_CHANGED:
1185 
1186             u->output_thread_info.push_called = false;
1187 
1188             return 0;
1189 
1190         case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY:
1191 
1192             u->output_thread_info.effective_source_latency = (pa_usec_t)offset;
1193 
1194             return 0;
1195 
1196         case SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY:
1197 
1198             u->output_thread_info.minimum_latency = (pa_usec_t)offset;
1199 
1200             return 0;
1201 
1202         case SINK_INPUT_MESSAGE_FAST_ADJUST:
1203 
1204             memblockq_adjust(u, offset, true);
1205 
1206             return 0;
1207     }
1208 
1209     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1210 }
1211 /* Called from main thread.
1212  * Set sink input latency to one third of the overall latency if possible.
1213  * The choice of one third is rather arbitrary somewhere between the minimum
1214  * possible latency which would cause a lot of CPU load and half the configured
1215  * latency which would quickly lead to underruns. */
set_sink_input_latency(struct userdata * u,pa_sink * sink)1216 static void set_sink_input_latency(struct userdata *u, pa_sink *sink) {
1217      pa_usec_t latency, requested_latency;
1218 
1219     requested_latency = u->latency / 3;
1220 
1221     /* Normally we try to configure sink and source latency equally. If the
1222      * source latency cannot match the requested sink latency try to set the
1223      * sink latency to a smaller value to avoid underruns */
1224     if (u->min_source_latency > requested_latency) {
1225         latency = PA_MAX(u->latency, u->minimum_latency);
1226         requested_latency = (latency - u->min_source_latency) / 2;
1227         /* In the case of a fixed alsa source, u->minimum_latency is calculated from
1228          * the default fragment size while u->min_source_latency is the reported minimum
1229          * of the source latency (nr_of_fragments * fragment_size). This can lead to a
1230          * situation where u->minimum_latency < u->min_source_latency. We only fall
1231          * back to use the fragment size instead of min_source_latency if the calculation
1232          * above does not deliver a usable result. */
1233         if (u->fixed_alsa_source && u->min_source_latency >= latency)
1234             requested_latency = (latency - u->core->default_fragment_size_msec * PA_USEC_PER_MSEC) / 2;
1235     }
1236 
1237     latency = PA_CLAMP(requested_latency , u->min_sink_latency, u->max_sink_latency);
1238     u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency);
1239     if (u->configured_sink_latency != requested_latency)
1240         pa_log_warn("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC);
1241 }
1242 
1243 /* Called from output thread context */
sink_input_attach_cb(pa_sink_input * i)1244 static void sink_input_attach_cb(pa_sink_input *i) {
1245     struct userdata *u;
1246 
1247     pa_sink_input_assert_ref(i);
1248     pa_sink_input_assert_io_context(i);
1249     pa_assert_se(u = i->userdata);
1250 
1251     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1252             i->sink->thread_info.rtpoll,
1253             PA_RTPOLL_LATE,
1254             u->asyncmsgq);
1255 
1256     pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2);
1257     pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
1258 }
1259 
1260 /* Called from output thread context */
sink_input_detach_cb(pa_sink_input * i)1261 static void sink_input_detach_cb(pa_sink_input *i) {
1262     struct userdata *u;
1263 
1264     pa_sink_input_assert_ref(i);
1265     pa_sink_input_assert_io_context(i);
1266     pa_assert_se(u = i->userdata);
1267 
1268     if (u->rtpoll_item_read) {
1269         pa_rtpoll_item_free(u->rtpoll_item_read);
1270         u->rtpoll_item_read = NULL;
1271     }
1272 }
1273 
1274 /* Called from output thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)1275 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1276     struct userdata *u;
1277 
1278     pa_sink_input_assert_ref(i);
1279     pa_sink_input_assert_io_context(i);
1280     pa_assert_se(u = i->userdata);
1281 
1282     pa_memblockq_set_maxrewind(u->memblockq, nbytes);
1283 }
1284 
1285 /* Called from output thread context */
sink_input_update_max_request_cb(pa_sink_input * i,size_t nbytes)1286 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1287     struct userdata *u;
1288 
1289     pa_sink_input_assert_ref(i);
1290     pa_sink_input_assert_io_context(i);
1291     pa_assert_se(u = i->userdata);
1292 
1293     pa_memblockq_set_prebuf(u->memblockq, nbytes*2);
1294     pa_log_info("Max request changed");
1295 }
1296 
1297 /* Called from main thread */
sink_input_kill_cb(pa_sink_input * i)1298 static void sink_input_kill_cb(pa_sink_input *i) {
1299     struct userdata *u;
1300 
1301     pa_sink_input_assert_ref(i);
1302     pa_assert_ctl_context();
1303     pa_assert_se(u = i->userdata);
1304 
1305     teardown(u);
1306     pa_module_unload_request(u->module, true);
1307 }
1308 
1309 /* Called from the output thread context */
sink_input_state_change_cb(pa_sink_input * i,pa_sink_input_state_t state)1310 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1311     struct userdata *u;
1312 
1313     pa_sink_input_assert_ref(i);
1314     pa_assert_se(u = i->userdata);
1315 
1316     if (state == PA_SINK_INPUT_UNLINKED)
1317         pa_asyncmsgq_flush(u->asyncmsgq, false);
1318 }
1319 
1320 /* Called from main thread */
sink_input_moving_cb(pa_sink_input * i,pa_sink * dest)1321 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1322     struct userdata *u;
1323     char *output_description;
1324     const char *n;
1325 
1326     if (!dest)
1327         return;
1328 
1329     pa_sink_input_assert_ref(i);
1330     pa_assert_ctl_context();
1331     pa_assert_se(u = i->userdata);
1332 
1333     output_description = pa_sprintf_malloc("Loopback to %s",
1334                                            pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1335     pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_NAME, output_description);
1336     pa_xfree(output_description);
1337 
1338     if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
1339         pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_ICON_NAME, n);
1340 
1341     /* Set latency and calculate latency limits */
1342     u->underrun_latency_limit = 0;
1343     u->last_sink_latency_offset = dest->port_latency_offset;
1344     u->initial_adjust_pending = true;
1345     update_latency_boundaries(u, NULL, dest);
1346     set_sink_input_latency(u, dest);
1347     update_effective_source_latency(u, u->source_output->source, dest);
1348 
1349     /* Uncork the source output unless the destination is suspended for other
1350      * reasons than idle */
1351     if (dest->state == PA_SINK_SUSPENDED)
1352         pa_source_output_cork(u->source_output, (dest->suspend_cause != PA_SUSPEND_IDLE));
1353     else
1354         pa_source_output_cork(u->source_output, false);
1355 
1356     update_adjust_timer(u);
1357 
1358     /* Reset counters */
1359     u->iteration_counter = 0;
1360     u->underrun_counter = 0;
1361 
1362     /* Reset booleans, latency error and counters */
1363     u->source_sink_changed = true;
1364     u->underrun_occured = false;
1365     u->sink_latency_offset_changed = false;
1366     u->target_latency_cross_counter = 0;
1367     u->log_counter = u->log_interval;
1368     u->latency_error = 0;
1369 
1370     u->output_thread_info.pop_called = false;
1371     u->output_thread_info.first_pop_done = false;
1372 
1373     /* Sample rate may be far away from the default rate if we are still
1374      * recovering from a previous source or sink change, so reset rate to
1375      * default before moving the sink. */
1376     pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
1377 }
1378 
1379 /* Called from main thread */
sink_input_may_move_to_cb(pa_sink_input * i,pa_sink * dest)1380 static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1381     struct userdata *u;
1382 
1383     pa_sink_input_assert_ref(i);
1384     pa_assert_ctl_context();
1385     pa_assert_se(u = i->userdata);
1386 
1387     if (!u->source_output || !u->source_output->source)
1388         return true;
1389 
1390     return dest != u->source_output->source->monitor_of;
1391 }
1392 
1393 /* Called from main thread */
sink_input_suspend_cb(pa_sink_input * i,pa_sink_state_t old_state,pa_suspend_cause_t old_suspend_cause)1394 static void sink_input_suspend_cb(pa_sink_input *i, pa_sink_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
1395     struct userdata *u;
1396     bool suspended;
1397 
1398     pa_sink_input_assert_ref(i);
1399     pa_assert_ctl_context();
1400     pa_assert_se(u = i->userdata);
1401 
1402     /* State has not changed, nothing to do */
1403     if (old_state == i->sink->state)
1404         return;
1405 
1406     suspended = (i->sink->state == PA_SINK_SUSPENDED);
1407 
1408     /* If the sink has been suspended, we need to handle this like
1409      * a sink change when the sink is resumed. Because the sink
1410      * is suspended, we can set the variables directly. */
1411     if (suspended) {
1412         u->output_thread_info.pop_called = false;
1413         u->output_thread_info.first_pop_done = false;
1414 
1415     } else
1416         /* Set effective source latency on unsuspend */
1417         update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1418 
1419     pa_source_output_cork(u->source_output, suspended);
1420 
1421     update_adjust_timer(u);
1422 }
1423 
1424 /* Called from output thread context */
update_sink_latency_range_cb(pa_sink_input * i)1425 static void update_sink_latency_range_cb(pa_sink_input *i) {
1426     struct userdata *u;
1427 
1428     pa_sink_input_assert_ref(i);
1429     pa_sink_input_assert_io_context(i);
1430     pa_assert_se(u = i->userdata);
1431 
1432     /* Sink latency may have changed */
1433     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1434 }
1435 
1436 /* Called from main context */
loopback_process_msg_cb(pa_msgobject * o,int code,void * userdata,int64_t offset,pa_memchunk * chunk)1437 static int loopback_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1438     struct loopback_msg *msg;
1439     struct userdata *u;
1440     pa_usec_t current_latency;
1441 
1442     pa_assert(o);
1443     pa_assert_ctl_context();
1444 
1445     msg = LOOPBACK_MSG(o);
1446 
1447     /* If messages are processed after a module unload request, they
1448      * must be ignored. */
1449     if (msg->dead)
1450         return 0;
1451 
1452     pa_assert_se(u = msg->userdata);
1453 
1454     switch (code) {
1455 
1456         case LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED:
1457 
1458             update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1459             current_latency = pa_source_get_requested_latency(u->source_output->source);
1460             if (current_latency > u->configured_source_latency) {
1461                 /* The minimum latency has changed to a value larger than the configured latency, so
1462                  * the source latency has been increased. The case that the minimum latency changes
1463                  * back to a smaller value is not handled because this never happens with the current
1464                  * source implementations. */
1465                 pa_log_warn("Source minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1466                 u->configured_source_latency = current_latency;
1467                 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1468                 /* We re-start counting when the latency has changed */
1469                 u->iteration_counter = 0;
1470                 u->underrun_counter = 0;
1471             }
1472 
1473             return 0;
1474 
1475         case LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED:
1476 
1477             current_latency = pa_sink_get_requested_latency(u->sink_input->sink);
1478             if (current_latency > u->configured_sink_latency) {
1479                 /* The minimum latency has changed to a value larger than the configured latency, so
1480                  * the sink latency has been increased. The case that the minimum latency changes back
1481                  * to a smaller value is not handled because this never happens with the current sink
1482                  * implementations. */
1483                 pa_log_warn("Sink minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1484                 u->configured_sink_latency = current_latency;
1485                 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1486                 /* We re-start counting when the latency has changed */
1487                 u->iteration_counter = 0;
1488                 u->underrun_counter = 0;
1489             }
1490 
1491             return 0;
1492 
1493         case LOOPBACK_MESSAGE_UNDERRUN:
1494 
1495             u->underrun_counter++;
1496             u->underrun_occured = true;
1497             u->target_latency_cross_counter = 0;
1498             pa_log_debug("Underrun detected, counter incremented to %u", u->underrun_counter);
1499 
1500             return 0;
1501 
1502         case LOOPBACK_MESSAGE_ADJUST_DONE:
1503 
1504             u->initial_adjust_pending = false;
1505 
1506             return 0;
1507 
1508     }
1509 
1510     return 0;
1511 }
1512 
1513 /* Called from main thread */
sink_port_latency_offset_changed_cb(pa_core * core,pa_sink * sink,struct userdata * u)1514 static pa_hook_result_t sink_port_latency_offset_changed_cb(pa_core *core, pa_sink *sink, struct userdata *u) {
1515 
1516     if (sink != u->sink_input->sink)
1517         return PA_HOOK_OK;
1518 
1519     if (!u->sink_latency_offset_changed)
1520         u->last_sink_latency_offset = u->sink_latency_offset;
1521     u->sink_latency_offset_changed = true;
1522     u->sink_latency_offset = sink->port_latency_offset;
1523     update_minimum_latency(u, sink, true);
1524 
1525     /* We might need to adjust again, reset counter */
1526     u->target_latency_cross_counter = 0;
1527 
1528     return PA_HOOK_OK;
1529 }
1530 
1531 /* Called from main thread */
source_port_latency_offset_changed_cb(pa_core * core,pa_source * source,struct userdata * u)1532 static pa_hook_result_t source_port_latency_offset_changed_cb(pa_core *core, pa_source *source, struct userdata *u) {
1533 
1534     if (source != u->source_output->source)
1535         return PA_HOOK_OK;
1536 
1537     if (!u->source_latency_offset_changed)
1538         u->last_source_latency_offset = u->source_latency_offset;
1539     u->source_latency_offset_changed = true;
1540     u->source_latency_offset = source->port_latency_offset;
1541     update_minimum_latency(u, u->sink_input->sink, true);
1542 
1543     /* We might need to adjust again, reset counter */
1544     u->target_latency_cross_counter = 0;
1545 
1546     return PA_HOOK_OK;
1547 }
1548 
pa__init(pa_module * m)1549 int pa__init(pa_module *m) {
1550     pa_modargs *ma = NULL;
1551     struct userdata *u;
1552     pa_sink *sink = NULL;
1553     pa_sink_input_new_data sink_input_data;
1554     bool sink_dont_move;
1555     pa_source *source = NULL;
1556     pa_source_output_new_data source_output_data;
1557     bool source_dont_move;
1558     uint32_t latency_msec;
1559     uint32_t max_latency_msec;
1560     uint32_t fast_adjust_threshold;
1561     uint32_t adjust_threshold;
1562     pa_sample_spec ss;
1563     pa_channel_map map;
1564     bool format_set = false;
1565     bool rate_set = false;
1566     bool channels_set = false;
1567     pa_memchunk silence;
1568     double adjust_time_sec;
1569     double log_interval_sec;
1570     const char *n;
1571     bool remix = true;
1572 
1573     pa_assert(m);
1574 
1575     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1576         pa_log("Failed to parse module arguments");
1577         goto fail;
1578     }
1579 
1580     n = pa_modargs_get_value(ma, "source", NULL);
1581     if (n && !(source = pa_namereg_get(m->core, n, PA_NAMEREG_SOURCE))) {
1582         pa_log("No such source.");
1583         goto fail;
1584     }
1585 
1586     n = pa_modargs_get_value(ma, "sink", NULL);
1587     if (n && !(sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1588         pa_log("No such sink.");
1589         goto fail;
1590     }
1591 
1592     if (pa_modargs_get_value_boolean(ma, "remix", &remix) < 0) {
1593         pa_log("Invalid boolean remix parameter");
1594         goto fail;
1595     }
1596 
1597     if (source) {
1598         ss = source->sample_spec;
1599         map = source->channel_map;
1600         format_set = true;
1601         rate_set = true;
1602         channels_set = true;
1603     } else if (sink) {
1604         ss = sink->sample_spec;
1605         map = sink->channel_map;
1606         format_set = true;
1607         rate_set = true;
1608         channels_set = true;
1609     } else {
1610         /* FIXME: Dummy stream format, needed because pa_sink_input_new()
1611          * requires valid sample spec and channel map even when all the FIX_*
1612          * stream flags are specified. pa_sink_input_new() should be changed
1613          * to ignore the sample spec and channel map when the FIX_* flags are
1614          * present. */
1615         ss.format = PA_SAMPLE_U8;
1616         ss.rate = 8000;
1617         ss.channels = 1;
1618         map.channels = 1;
1619         map.map[0] = PA_CHANNEL_POSITION_MONO;
1620     }
1621 
1622     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1623         pa_log("Invalid sample format specification or channel map");
1624         goto fail;
1625     }
1626 
1627     if (ss.rate < 4000 || ss.rate > PA_RATE_MAX) {
1628         pa_log("Invalid rate specification, valid range is 4000 Hz to %i Hz", PA_RATE_MAX);
1629         goto fail;
1630     }
1631 
1632     if (pa_modargs_get_value(ma, "format", NULL))
1633         format_set = true;
1634 
1635     if (pa_modargs_get_value(ma, "rate", NULL))
1636         rate_set = true;
1637 
1638     if (pa_modargs_get_value(ma, "channels", NULL) || pa_modargs_get_value(ma, "channel_map", NULL))
1639         channels_set = true;
1640 
1641     adjust_threshold = DEFAULT_ADJUST_THRESHOLD_USEC;
1642     if (pa_modargs_get_value_u32(ma, "adjust_threshold_usec", &adjust_threshold) < 0 || adjust_threshold < 1 || adjust_threshold > 10000) {
1643         pa_log_info("Invalid adjust threshold specification");
1644         goto fail;
1645     }
1646 
1647     latency_msec = DEFAULT_LATENCY_MSEC;
1648     if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 30000) {
1649         pa_log("Invalid latency specification");
1650         goto fail;
1651     }
1652 
1653     fast_adjust_threshold = 0;
1654     if (pa_modargs_get_value_u32(ma, "fast_adjust_threshold_msec", &fast_adjust_threshold) < 0 || (fast_adjust_threshold != 0 && fast_adjust_threshold < 100)) {
1655         pa_log("Invalid fast adjust threshold specification");
1656         goto fail;
1657     }
1658 
1659     max_latency_msec = 0;
1660     if (pa_modargs_get_value_u32(ma, "max_latency_msec", &max_latency_msec) < 0) {
1661         pa_log("Invalid maximum latency specification");
1662         goto fail;
1663     }
1664 
1665     if (max_latency_msec > 0 && max_latency_msec < latency_msec) {
1666         pa_log_warn("Configured maximum latency is smaller than latency, using latency instead");
1667         max_latency_msec = latency_msec;
1668     }
1669 
1670     m->userdata = u = pa_xnew0(struct userdata, 1);
1671     u->core = m->core;
1672     u->module = m;
1673     u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
1674     u->max_latency = (pa_usec_t) max_latency_msec * PA_USEC_PER_MSEC;
1675     u->output_thread_info.pop_called = false;
1676     u->output_thread_info.pop_adjust = false;
1677     u->output_thread_info.push_called = false;
1678     u->iteration_counter = 0;
1679     u->underrun_counter = 0;
1680     u->underrun_latency_limit = 0;
1681     u->source_sink_changed = true;
1682     u->real_adjust_time_sum = 0;
1683     u->adjust_counter = 0;
1684     u->fast_adjust_threshold = fast_adjust_threshold * PA_USEC_PER_MSEC;
1685     u->underrun_occured = false;
1686     u->source_latency_offset_changed = false;
1687     u->sink_latency_offset_changed = false;
1688     u->latency_error = 0;
1689     u->adjust_threshold = adjust_threshold;
1690     u->target_latency_cross_counter = 0;
1691     u->initial_adjust_pending = true;
1692 
1693     adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1694     if (pa_modargs_get_value_double(ma, "adjust_time", &adjust_time_sec) < 0) {
1695         pa_log("Failed to parse adjust_time value");
1696         goto fail;
1697     }
1698 
1699     /* Allow values >= 0.1 and also 0 which means no adjustment */
1700     if (adjust_time_sec < 0.1) {
1701         if (adjust_time_sec < 0 || adjust_time_sec > 0) {
1702             pa_log("Failed to parse adjust_time value");
1703             goto fail;
1704         }
1705     }
1706 
1707     u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1708     u->real_adjust_time = u->adjust_time;
1709 
1710     pa_source_output_new_data_init(&source_output_data);
1711     source_output_data.driver = __FILE__;
1712     source_output_data.module = m;
1713     if (source)
1714         pa_source_output_new_data_set_source(&source_output_data, source, false, true);
1715 
1716     if (pa_modargs_get_proplist(ma, "source_output_properties", source_output_data.proplist, PA_UPDATE_REPLACE) < 0) {
1717         pa_log("Failed to parse the source_output_properties value.");
1718         pa_source_output_new_data_done(&source_output_data);
1719         goto fail;
1720     }
1721 
1722     if (!pa_proplist_contains(source_output_data.proplist, PA_PROP_MEDIA_ROLE))
1723         pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1724 
1725     pa_source_output_new_data_set_sample_spec(&source_output_data, &ss);
1726     pa_source_output_new_data_set_channel_map(&source_output_data, &map);
1727     source_output_data.flags = PA_SOURCE_OUTPUT_START_CORKED;
1728 
1729     if (!remix)
1730         source_output_data.flags |= PA_SOURCE_OUTPUT_NO_REMIX;
1731 
1732     if (!format_set)
1733         source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_FORMAT;
1734 
1735     if (!rate_set)
1736         source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_RATE;
1737 
1738     if (!channels_set)
1739         source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_CHANNELS;
1740 
1741     source_dont_move = false;
1742     if (pa_modargs_get_value_boolean(ma, "source_dont_move", &source_dont_move) < 0) {
1743         pa_log("source_dont_move= expects a boolean argument.");
1744         goto fail;
1745     }
1746 
1747     if (source_dont_move)
1748         source_output_data.flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
1749 
1750     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1751     pa_source_output_new_data_done(&source_output_data);
1752 
1753     if (!u->source_output)
1754         goto fail;
1755 
1756     u->source_output->parent.process_msg = source_output_process_msg_cb;
1757     u->source_output->push = source_output_push_cb;
1758     u->source_output->process_rewind = source_output_process_rewind_cb;
1759     u->source_output->kill = source_output_kill_cb;
1760     u->source_output->attach = source_output_attach_cb;
1761     u->source_output->detach = source_output_detach_cb;
1762     u->source_output->may_move_to = source_output_may_move_to_cb;
1763     u->source_output->moving = source_output_moving_cb;
1764     u->source_output->suspend = source_output_suspend_cb;
1765     u->source_output->update_source_latency_range = update_source_latency_range_cb;
1766     u->source_output->update_source_fixed_latency = update_source_latency_range_cb;
1767     u->source_output->userdata = u;
1768 
1769     /* If format, rate or channels were originally unset, they are set now
1770      * after the pa_source_output_new() call. */
1771     ss = u->source_output->sample_spec;
1772     map = u->source_output->channel_map;
1773 
1774     /* Get log interval, default is 0, which means no logging */
1775     log_interval_sec = 0;
1776     if (pa_modargs_get_value_double(ma, "log_interval", &log_interval_sec) < 0) {
1777         pa_log_info("Invalid log interval specification");
1778         goto fail;
1779     }
1780 
1781     /* Allow values >= 0.1 and also 0 */
1782     if (log_interval_sec < 0.1) {
1783         if (log_interval_sec < 0 || log_interval_sec > 0) {
1784             pa_log("Failed to parse log_interval value");
1785             goto fail;
1786         }
1787     }
1788 
1789     /* Estimate number of iterations for logging. */
1790     u->log_interval = 0;
1791     if (u->adjust_time != 0 && log_interval_sec != 0) {
1792         u->log_interval = (int)(log_interval_sec * PA_USEC_PER_SEC / u->adjust_time + 0.5);
1793         /* Logging was specified, but log interval parameter was too small,
1794          * therefore log on every iteration */
1795         if (u->log_interval == 0)
1796             u->log_interval = 1;
1797     }
1798     u->log_counter = u->log_interval;
1799 
1800     pa_sink_input_new_data_init(&sink_input_data);
1801     sink_input_data.driver = __FILE__;
1802     sink_input_data.module = m;
1803 
1804     if (sink)
1805         pa_sink_input_new_data_set_sink(&sink_input_data, sink, false, true);
1806 
1807     if (pa_modargs_get_proplist(ma, "sink_input_properties", sink_input_data.proplist, PA_UPDATE_REPLACE) < 0) {
1808         pa_log("Failed to parse the sink_input_properties value.");
1809         pa_sink_input_new_data_done(&sink_input_data);
1810         goto fail;
1811     }
1812 
1813     if (!pa_proplist_contains(sink_input_data.proplist, PA_PROP_MEDIA_ROLE))
1814         pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1815 
1816     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
1817     pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
1818     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED;
1819 
1820     if (!remix)
1821         sink_input_data.flags |= PA_SINK_INPUT_NO_REMIX;
1822 
1823     sink_dont_move = false;
1824     if (pa_modargs_get_value_boolean(ma, "sink_dont_move", &sink_dont_move) < 0) {
1825         pa_log("sink_dont_move= expects a boolean argument.");
1826         goto fail;
1827     }
1828 
1829     if (sink_dont_move)
1830         sink_input_data.flags |= PA_SINK_INPUT_DONT_MOVE;
1831 
1832     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1833     pa_sink_input_new_data_done(&sink_input_data);
1834 
1835     if (!u->sink_input)
1836         goto fail;
1837 
1838     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1839     u->sink_input->pop = sink_input_pop_cb;
1840     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1841     u->sink_input->kill = sink_input_kill_cb;
1842     u->sink_input->state_change = sink_input_state_change_cb;
1843     u->sink_input->attach = sink_input_attach_cb;
1844     u->sink_input->detach = sink_input_detach_cb;
1845     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1846     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1847     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1848     u->sink_input->moving = sink_input_moving_cb;
1849     u->sink_input->suspend = sink_input_suspend_cb;
1850     u->sink_input->update_sink_latency_range = update_sink_latency_range_cb;
1851     u->sink_input->update_sink_fixed_latency = update_sink_latency_range_cb;
1852     u->sink_input->userdata = u;
1853 
1854     u->last_source_latency_offset = u->source_output->source->port_latency_offset;
1855     u->last_sink_latency_offset = u->sink_input->sink->port_latency_offset;
1856     update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1857     set_sink_input_latency(u, u->sink_input->sink);
1858     set_source_output_latency(u, u->source_output->source);
1859 
1860     pa_sink_input_get_silence(u->sink_input, &silence);
1861     u->memblockq = pa_memblockq_new(
1862             "module-loopback memblockq",
1863             0,                      /* idx */
1864             MEMBLOCKQ_MAXLENGTH,    /* maxlength */
1865             MEMBLOCKQ_MAXLENGTH,    /* tlength */
1866             &ss,                    /* sample_spec */
1867             0,                      /* prebuf */
1868             0,                      /* minreq */
1869             0,                      /* maxrewind */
1870             &silence);              /* silence frame */
1871     pa_memblock_unref(silence.memblock);
1872     /* Fill the memblockq with silence */
1873     pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true);
1874 
1875     u->asyncmsgq = pa_asyncmsgq_new(0);
1876     if (!u->asyncmsgq) {
1877         pa_log("pa_asyncmsgq_new() failed.");
1878         goto fail;
1879     }
1880 
1881     if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_NAME))
1882         pa_proplist_setf(u->source_output->proplist, PA_PROP_MEDIA_NAME, "Loopback to %s",
1883                          pa_strnull(pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1884 
1885     if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME)
1886             && (n = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_ICON_NAME)))
1887         pa_proplist_sets(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1888 
1889     if (!pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_NAME))
1890         pa_proplist_setf(u->sink_input->proplist, PA_PROP_MEDIA_NAME, "Loopback from %s",
1891                          pa_strnull(pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1892 
1893     if (source && !pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME)
1894             && (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME)))
1895         pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1896 
1897     /* Hooks to track changes of latency offsets */
1898     pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SINK_PORT_LATENCY_OFFSET_CHANGED],
1899                            PA_HOOK_NORMAL, (pa_hook_cb_t) sink_port_latency_offset_changed_cb, u);
1900     pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SOURCE_PORT_LATENCY_OFFSET_CHANGED],
1901                            PA_HOOK_NORMAL, (pa_hook_cb_t) source_port_latency_offset_changed_cb, u);
1902 
1903     /* Setup message handler for main thread */
1904     u->msg = pa_msgobject_new(loopback_msg);
1905     u->msg->parent.process_msg = loopback_process_msg_cb;
1906     u->msg->userdata = u;
1907     u->msg->dead = false;
1908 
1909     /* The output thread is not yet running, set effective_source_latency directly */
1910     update_effective_source_latency(u, u->source_output->source, NULL);
1911 
1912     pa_sink_input_put(u->sink_input);
1913     pa_source_output_put(u->source_output);
1914 
1915     if (u->source_output->source->state != PA_SOURCE_SUSPENDED)
1916         pa_sink_input_cork(u->sink_input, false);
1917 
1918     if (u->sink_input->sink->state != PA_SINK_SUSPENDED)
1919         pa_source_output_cork(u->source_output, false);
1920 
1921     update_adjust_timer(u);
1922 
1923     pa_modargs_free(ma);
1924     return 0;
1925 
1926 fail:
1927     if (ma)
1928         pa_modargs_free(ma);
1929 
1930     pa__done(m);
1931 
1932     return -1;
1933 }
1934 
pa__done(pa_module * m)1935 void pa__done(pa_module*m) {
1936     struct userdata *u;
1937 
1938     pa_assert(m);
1939 
1940     if (!(u = m->userdata))
1941         return;
1942 
1943     teardown(u);
1944 
1945     if (u->memblockq)
1946         pa_memblockq_free(u->memblockq);
1947 
1948     if (u->asyncmsgq)
1949         pa_asyncmsgq_unref(u->asyncmsgq);
1950 
1951     if (u->msg)
1952         loopback_msg_unref(u->msg);
1953 
1954     pa_xfree(u);
1955 }
1956