1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2009 Intel Corporation
5 Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26
27 #include <pulse/xmalloc.h>
28
29 #include <pulsecore/sink-input.h>
30 #include <pulsecore/module.h>
31 #include <pulsecore/modargs.h>
32 #include <pulsecore/namereg.h>
33 #include <pulsecore/log.h>
34 #include <pulsecore/core-util.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38
39 PA_MODULE_AUTHOR("Pierre-Louis Bossart, Georg Chini");
40 PA_MODULE_DESCRIPTION("Loopback from source to sink");
41 PA_MODULE_VERSION(PACKAGE_VERSION);
42 PA_MODULE_LOAD_ONCE(false);
43 PA_MODULE_USAGE(
44 "source=<source to connect to> "
45 "sink=<sink to connect to> "
46 "adjust_time=<how often to readjust rates in s> "
47 "latency_msec=<latency in ms> "
48 "max_latency_msec=<maximum latency in ms> "
49 "log_interval=<how often to log in s> "
50 "fast_adjust_threshold_msec=<threshold for fast adjust in ms> "
51 "adjust_threshold_usec=<threshold for latency adjustment in usec> "
52 "format=<sample format> "
53 "rate=<sample rate> "
54 "channels=<number of channels> "
55 "channel_map=<channel map> "
56 "sink_input_properties=<proplist> "
57 "source_output_properties=<proplist> "
58 "source_dont_move=<boolean> "
59 "sink_dont_move=<boolean> "
60 "remix=<remix channels?> ");
61
62 #define DEFAULT_LATENCY_MSEC 200
63
64 #define FILTER_PARAMETER 0.125
65
66 #define DEFAULT_ADJUST_THRESHOLD_USEC 250
67
68 #define MEMBLOCKQ_MAXLENGTH (1024*1024*32)
69
70 #define MIN_DEVICE_LATENCY (2.5*PA_USEC_PER_MSEC)
71
72 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
73
74 typedef struct loopback_msg loopback_msg;
75
76 struct userdata {
77 pa_core *core;
78 pa_module *module;
79
80 loopback_msg *msg;
81
82 pa_sink_input *sink_input;
83 pa_source_output *source_output;
84
85 pa_asyncmsgq *asyncmsgq;
86 pa_memblockq *memblockq;
87
88 pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
89
90 pa_time_event *time_event;
91
92 /* Variables used to calculate the average time between
93 * subsequent calls of adjust_rates() */
94 pa_usec_t adjust_time_stamp;
95 pa_usec_t real_adjust_time;
96 pa_usec_t real_adjust_time_sum;
97
98 /* Values from command line configuration */
99 pa_usec_t latency;
100 pa_usec_t max_latency;
101 pa_usec_t adjust_time;
102 pa_usec_t fast_adjust_threshold;
103 uint32_t adjust_threshold;
104 uint32_t log_interval;
105
106 /* Latency boundaries and current values */
107 pa_usec_t min_source_latency;
108 pa_usec_t max_source_latency;
109 pa_usec_t min_sink_latency;
110 pa_usec_t max_sink_latency;
111 pa_usec_t configured_sink_latency;
112 pa_usec_t configured_source_latency;
113 int64_t source_latency_offset;
114 int64_t sink_latency_offset;
115 pa_usec_t minimum_latency;
116
117 /* State variable of the latency controller */
118 int32_t last_latency_difference;
119 int64_t last_source_latency_offset;
120 int64_t last_sink_latency_offset;
121 int64_t next_latency_with_drift;
122 int64_t next_latency_at_optimum_rate_with_drift;
123
124 /* Filter varables used for 2nd order filter */
125 double drift_filter;
126 double drift_compensation_rate;
127
128 /* Variables for Kalman filter and error tracking*/
129 double latency_variance;
130 double kalman_variance;
131 double latency_error;
132
133 /* lower latency limit found by underruns */
134 pa_usec_t underrun_latency_limit;
135
136 /* Various counters */
137 uint32_t iteration_counter;
138 uint32_t underrun_counter;
139 uint32_t adjust_counter;
140 uint32_t target_latency_cross_counter;
141 uint32_t log_counter;
142
143 /* Various booleans */
144 bool fixed_alsa_source;
145 bool source_sink_changed;
146 bool underrun_occured;
147 bool source_latency_offset_changed;
148 bool sink_latency_offset_changed;
149 bool initial_adjust_pending;
150
151 /* Used for sink input and source output snapshots */
152 struct {
153 int64_t send_counter;
154 int64_t source_latency;
155 pa_usec_t source_timestamp;
156
157 int64_t recv_counter;
158 size_t loopback_memblockq_length;
159 int64_t sink_latency;
160 pa_usec_t sink_timestamp;
161 } latency_snapshot;
162
163 /* Input thread variable */
164 int64_t send_counter;
165
166 /* Output thread variables */
167 struct {
168 int64_t recv_counter;
169 pa_usec_t effective_source_latency;
170
171 /* Copied from main thread */
172 pa_usec_t minimum_latency;
173
174 /* Various booleans */
175 bool in_pop;
176 bool pop_called;
177 bool pop_adjust;
178 bool first_pop_done;
179 bool push_called;
180 } output_thread_info;
181 };
182
183 struct loopback_msg {
184 pa_msgobject parent;
185 struct userdata *userdata;
186 bool dead;
187 };
188
189 PA_DEFINE_PRIVATE_CLASS(loopback_msg, pa_msgobject);
190 #define LOOPBACK_MSG(o) (loopback_msg_cast(o))
191
192 static const char* const valid_modargs[] = {
193 "source",
194 "sink",
195 "adjust_time",
196 "latency_msec",
197 "max_latency_msec",
198 "log_interval",
199 "fast_adjust_threshold_msec",
200 "adjust_threshold_usec",
201 "format",
202 "rate",
203 "channels",
204 "channel_map",
205 "sink_input_properties",
206 "source_output_properties",
207 "source_dont_move",
208 "sink_dont_move",
209 "remix",
210 NULL,
211 };
212
213 enum {
214 SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
215 SINK_INPUT_MESSAGE_REWIND,
216 SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
217 SINK_INPUT_MESSAGE_SOURCE_CHANGED,
218 SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY,
219 SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
220 SINK_INPUT_MESSAGE_FAST_ADJUST,
221 };
222
223 enum {
224 SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX,
225 };
226
227 enum {
228 LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED,
229 LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED,
230 LOOPBACK_MESSAGE_UNDERRUN,
231 LOOPBACK_MESSAGE_ADJUST_DONE,
232 };
233
234 static void enable_adjust_timer(struct userdata *u, bool enable);
235
236 /* Called from main context */
teardown(struct userdata * u)237 static void teardown(struct userdata *u) {
238 pa_assert(u);
239 pa_assert_ctl_context();
240
241 u->adjust_time = 0;
242 enable_adjust_timer(u, false);
243
244 if (u->msg)
245 u->msg->dead = true;
246
247 /* Handling the asyncmsgq between the source output and the sink input
248 * requires some care. When the source output is unlinked, nothing needs
249 * to be done for the asyncmsgq, because the source output is the sending
250 * end. But when the sink input is unlinked, we should ensure that the
251 * asyncmsgq is emptied, because the messages in the queue hold references
252 * to the sink input. Also, we need to ensure that new messages won't be
253 * written to the queue after we have emptied it.
254 *
255 * Emptying the queue can be done in the state_change() callback of the
256 * sink input, when the new state is "unlinked".
257 *
258 * Preventing new messages from being written to the queue can be achieved
259 * by unlinking the source output before unlinking the sink input. There
260 * are no other writers for that queue, so this is sufficient. */
261
262 if (u->source_output) {
263 pa_source_output_unlink(u->source_output);
264 pa_source_output_unref(u->source_output);
265 u->source_output = NULL;
266 }
267
268 if (u->sink_input) {
269 pa_sink_input_unlink(u->sink_input);
270 pa_sink_input_unref(u->sink_input);
271 u->sink_input = NULL;
272 }
273 }
274
275 /* rate controller, called from main context
276 * - maximum deviation from optimum rate for P-controller is less than 1%
277 * - P-controller step size is limited to 2.01‰
278 * - will calculate an optimum rate
279 */
rate_controller(struct userdata * u,uint32_t base_rate,uint32_t old_rate,int32_t latency_difference_at_optimum_rate,int32_t latency_difference_at_base_rate)280 static uint32_t rate_controller(
281 struct userdata *u,
282 uint32_t base_rate, uint32_t old_rate,
283 int32_t latency_difference_at_optimum_rate,
284 int32_t latency_difference_at_base_rate) {
285
286 double new_rate, new_rate_1, new_rate_2;
287 double min_cycles_1, min_cycles_2, drift_rate, latency_drift, controller_weight, min_weight;
288 uint32_t base_rate_with_drift;
289
290 base_rate_with_drift = (int)(base_rate + u->drift_compensation_rate);
291
292 /* If we are less than 2‰ away from the optimum rate, lower weight of the
293 * P-controller. The weight is determined by the fact that a correction
294 * of 0.5 Hz needs to be applied by the controller when the latency
295 * difference gets larger than the threshold. The weight follows
296 * from the definition of the controller. The minimum will only
297 * be reached when one adjust threshold away from the target. Start
298 * using the weight after the target latency has been reached for the
299 * second time to accelerate initial convergence. The second time has
300 * been chosen because it takes a while before the smoother returns
301 * reliable latencies. */
302 controller_weight = 1;
303 min_weight = PA_CLAMP(0.5 / (double)base_rate * (100.0 + (double)u->real_adjust_time / u->adjust_threshold), 0, 1.0);
304 if ((double)abs((int)(old_rate - base_rate_with_drift)) / base_rate_with_drift < 0.002 && u->target_latency_cross_counter >= 2)
305 controller_weight = PA_CLAMP((double)abs(latency_difference_at_optimum_rate) / u->adjust_threshold * min_weight, min_weight, 1.0);
306
307 /* Calculate next rate that is not more than 2‰ away from the last rate */
308 min_cycles_1 = (double)abs(latency_difference_at_optimum_rate) / u->real_adjust_time / 0.002 + 1;
309 new_rate_1 = old_rate + base_rate * (double)latency_difference_at_optimum_rate / min_cycles_1 / u->real_adjust_time;
310
311 /* Calculate best rate to correct the current latency offset, limit at
312 * 1% difference from base_rate */
313 min_cycles_2 = (double)abs(latency_difference_at_optimum_rate) / u->real_adjust_time / 0.01 + 1;
314 new_rate_2 = (double)base_rate * (1.0 + controller_weight * latency_difference_at_optimum_rate / min_cycles_2 / u->real_adjust_time);
315
316 /* Choose the rate that is nearer to base_rate unless we are already near
317 * to the desired latency and rate */
318 if (abs((int)(new_rate_1 - base_rate)) < abs((int)(new_rate_2 - base_rate)) && controller_weight > 0.99)
319 new_rate = new_rate_1;
320 else
321 new_rate = new_rate_2;
322
323 /* Calculate rate difference between source and sink. Skip calculation
324 * after a source/sink change, an underrun or latency offset change */
325
326 if (!u->underrun_occured && !u->source_sink_changed && !u->source_latency_offset_changed && !u->sink_latency_offset_changed) {
327 /* Latency difference between last iterations */
328 latency_drift = latency_difference_at_base_rate - u->last_latency_difference;
329
330 /* Calculate frequency difference between source and sink */
331 drift_rate = latency_drift * old_rate / u->real_adjust_time + old_rate - base_rate;
332
333 /* The maximum accepted sample rate difference between source and
334 * sink is 1% of the base rate. If the result is larger, something
335 * went wrong, so do not use it. Pass in 0 instead to allow the
336 * filter to decay. */
337 if (abs((int)drift_rate) > base_rate / 100)
338 drift_rate = 0;
339
340 /* 2nd order lowpass filter */
341 u->drift_filter = (1 - FILTER_PARAMETER) * u->drift_filter + FILTER_PARAMETER * drift_rate;
342 u->drift_compensation_rate = (1 - FILTER_PARAMETER) * u->drift_compensation_rate + FILTER_PARAMETER * u->drift_filter;
343 }
344
345 /* Use drift compensation. Though not likely, the rate might exceed the maximum allowed rate now. */
346 new_rate = new_rate + u->drift_compensation_rate + 0.5;
347
348 if (new_rate > base_rate * 101 / 100)
349 return base_rate * 101 / 100;
350 else if (new_rate < base_rate * 99 / 100)
351 return base_rate * 99 / 100;
352 else
353 return (int)new_rate;
354 }
355
356 /* Called from main thread.
357 * It has been a matter of discussion how to correctly calculate the minimum
358 * latency that module-loopback can deliver with a given source and sink.
359 * The calculation has been placed in a separate function so that the definition
360 * can easily be changed. The resulting estimate is not very exact because it
361 * depends on the reported latency ranges. In cases were the lower bounds of
362 * source and sink latency are not reported correctly (USB) the result will
363 * be wrong. */
update_minimum_latency(struct userdata * u,pa_sink * sink,bool print_msg)364 static void update_minimum_latency(struct userdata *u, pa_sink *sink, bool print_msg) {
365
366 if (u->underrun_latency_limit)
367 /* If we already detected a real latency limit because of underruns, use it */
368 u->minimum_latency = u->underrun_latency_limit;
369
370 else {
371 /* Calculate latency limit from latency ranges */
372
373 u->minimum_latency = u->min_sink_latency;
374 if (u->fixed_alsa_source)
375 /* If we are using an alsa source with fixed latency, we will get a wakeup when
376 * one fragment is filled, and then we empty the source buffer, so the source
377 * latency never grows much beyond one fragment (assuming that the CPU doesn't
378 * cause a bottleneck). */
379 u->minimum_latency += u->core->default_fragment_size_msec * PA_USEC_PER_MSEC;
380
381 else
382 /* In all other cases the source will deliver new data at latest after one source latency.
383 * Make sure there is enough data available that the sink can keep on playing until new
384 * data is pushed. */
385 u->minimum_latency += u->min_source_latency;
386
387 /* Multiply by 1.1 as a safety margin for delays that are proportional to the buffer sizes */
388 u->minimum_latency *= 1.1;
389
390 /* Add 1.5 ms as a safety margin for delays not related to the buffer sizes */
391 u->minimum_latency += 1.5 * PA_USEC_PER_MSEC;
392 }
393
394 /* Add the latency offsets */
395 if (-(u->sink_latency_offset + u->source_latency_offset) <= (int64_t)u->minimum_latency)
396 u->minimum_latency += u->sink_latency_offset + u->source_latency_offset;
397 else
398 u->minimum_latency = 0;
399
400 /* If the sink is valid, send a message to update the minimum latency to
401 * the output thread, else set the variable directly */
402 if (sink)
403 pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY, NULL, u->minimum_latency, NULL);
404 else
405 u->output_thread_info.minimum_latency = u->minimum_latency;
406
407 if (print_msg) {
408 pa_log_info("Minimum possible end to end latency: %0.2f ms", (double)u->minimum_latency / PA_USEC_PER_MSEC);
409 if (u->latency < u->minimum_latency)
410 pa_log_warn("Configured latency of %0.2f ms is smaller than minimum latency, using minimum instead", (double)u->latency / PA_USEC_PER_MSEC);
411 }
412 }
413
414 /* Called from main context */
adjust_rates(struct userdata * u)415 static void adjust_rates(struct userdata *u) {
416 size_t buffer;
417 uint32_t old_rate, base_rate, new_rate, run_hours;
418 int32_t latency_difference;
419 pa_usec_t current_buffer_latency, snapshot_delay;
420 int64_t current_source_sink_latency, current_latency, latency_at_optimum_rate;
421 pa_usec_t final_latency, now, time_passed;
422 double filtered_latency, current_latency_error, latency_correction, base_rate_with_drift;
423
424 pa_assert(u);
425 pa_assert_ctl_context();
426
427 /* Runtime and counters since last change of source or sink
428 * or source/sink latency */
429 run_hours = u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600;
430 u->iteration_counter +=1;
431
432 /* If we are seeing underruns then the latency is too small */
433 if (u->underrun_counter > 2) {
434 pa_usec_t target_latency;
435
436 target_latency = PA_MAX(u->latency, u->minimum_latency) + 5 * PA_USEC_PER_MSEC;
437
438 if (u->max_latency == 0 || target_latency < u->max_latency) {
439 u->underrun_latency_limit = PA_CLIP_SUB((int64_t)target_latency, u->sink_latency_offset + u->source_latency_offset);
440 pa_log_warn("Too many underruns, increasing latency to %0.2f ms", (double)target_latency / PA_USEC_PER_MSEC);
441 } else {
442 u->underrun_latency_limit = PA_CLIP_SUB((int64_t)u->max_latency, u->sink_latency_offset + u->source_latency_offset);
443 pa_log_warn("Too many underruns, configured maximum latency of %0.2f ms is reached", (double)u->max_latency / PA_USEC_PER_MSEC);
444 pa_log_warn("Consider increasing the max_latency_msec");
445 }
446
447 update_minimum_latency(u, u->sink_input->sink, false);
448 u->underrun_counter = 0;
449 }
450
451 /* Allow one underrun per hour */
452 if (u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600 > run_hours) {
453 u->underrun_counter = PA_CLIP_SUB(u->underrun_counter, 1u);
454 pa_log_info("Underrun counter: %u", u->underrun_counter);
455 }
456
457 /* Calculate real adjust time if source or sink did not change and if the system has
458 * not been suspended. If the time between two calls is more than 5% longer than the
459 * configured adjust time, we assume that the system has been sleeping and skip the
460 * calculation for this iteration. When source or sink changed or the system has been
461 * sleeping, we need to reset the parameters for drift compensation. */
462 now = pa_rtclock_now();
463 time_passed = now - u->adjust_time_stamp;
464 if (!u->source_sink_changed && time_passed < u->adjust_time * 1.05) {
465 u->adjust_counter++;
466 u->real_adjust_time_sum += time_passed;
467 u->real_adjust_time = u->real_adjust_time_sum / u->adjust_counter;
468 } else {
469 u->drift_compensation_rate = 0;
470 u->drift_filter = 0;
471 /* Ensure that source_sink_changed is set, so that the Kalman filter parameters
472 * will also be reset. */
473 u->source_sink_changed = true;
474 }
475 u->adjust_time_stamp = now;
476
477 /* Rates and latencies */
478 old_rate = u->sink_input->sample_spec.rate;
479 base_rate = u->source_output->sample_spec.rate;
480
481 buffer = u->latency_snapshot.loopback_memblockq_length;
482 if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter)
483 buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
484 else
485 buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
486
487 current_buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
488 snapshot_delay = u->latency_snapshot.source_timestamp - u->latency_snapshot.sink_timestamp;
489 current_source_sink_latency = u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency - snapshot_delay;
490
491 /* Current latency */
492 current_latency = current_source_sink_latency + current_buffer_latency;
493
494 /* Latency at optimum rate and latency difference */
495 latency_at_optimum_rate = current_source_sink_latency + current_buffer_latency * old_rate / (u->drift_compensation_rate + base_rate);
496
497 final_latency = PA_MAX(u->latency, u->minimum_latency);
498 latency_difference = (int32_t)(current_latency - final_latency);
499
500 /* Do not filter or calculate error if source or sink changed or if there was an underrun */
501 if (u->source_sink_changed || u->underrun_occured) {
502 /* Initial conditions are very unsure, so use a high variance */
503 u->kalman_variance = 10000000;
504 filtered_latency = latency_at_optimum_rate;
505 u->next_latency_at_optimum_rate_with_drift = latency_at_optimum_rate;
506 u->next_latency_with_drift = current_latency;
507
508 } else {
509 /* Correct predictions if one of the latency offsets changed between iterations */
510 u->next_latency_at_optimum_rate_with_drift += u->source_latency_offset - u->last_source_latency_offset;
511 u->next_latency_at_optimum_rate_with_drift += u->sink_latency_offset - u->last_sink_latency_offset;
512 u->next_latency_with_drift += u->source_latency_offset - u->last_source_latency_offset;
513 u->next_latency_with_drift += u->sink_latency_offset - u->last_sink_latency_offset;
514 /* Low pass filtered latency error. This value reflects how well the measured values match the prediction. */
515 u->latency_error = (1 - FILTER_PARAMETER) * u->latency_error + FILTER_PARAMETER * (double)abs((int32_t)(current_latency - u->next_latency_with_drift));
516 /* Low pass filtered latency variance */
517 current_latency_error = (double)abs((int32_t)(latency_at_optimum_rate - u->next_latency_at_optimum_rate_with_drift));
518 u->latency_variance = (1.0 - FILTER_PARAMETER) * u->latency_variance + FILTER_PARAMETER * current_latency_error * current_latency_error;
519 /* Kalman filter */
520 filtered_latency = (latency_at_optimum_rate * u->kalman_variance + u->next_latency_at_optimum_rate_with_drift * u->latency_variance) / (u->kalman_variance + u->latency_variance);
521 u->kalman_variance = u->kalman_variance * u->latency_variance / (u->kalman_variance + u->latency_variance) + u->latency_variance / 4 + 200;
522 }
523
524 /* Drop or insert samples if fast_adjust_threshold_msec was specified and the latency difference is too large. */
525 if (u->fast_adjust_threshold > 0 && abs(latency_difference) > u->fast_adjust_threshold) {
526 pa_log_debug ("Latency difference larger than %" PRIu64 " msec, skipping or inserting samples.", u->fast_adjust_threshold / PA_USEC_PER_MSEC);
527
528 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_FAST_ADJUST, NULL, current_source_sink_latency, NULL);
529
530 /* Skip real adjust time calculation and reset drift compensation parameters on next iteration. */
531 u->source_sink_changed = true;
532
533 /* We probably need to adjust again, reset cross_counter. */
534 u->target_latency_cross_counter = 0;
535 return;
536 }
537
538 /* Calculate new rate */
539 new_rate = rate_controller(u, base_rate, old_rate, (int32_t)(filtered_latency - final_latency), latency_difference);
540
541 /* Log every log_interval iterations if the log_interval parameter is set */
542 if (u->log_interval != 0) {
543 u->log_counter--;
544 if (u->log_counter == 0) {
545 pa_log_debug("Loopback status %s to %s:\n Source latency: %0.2f ms\n Buffer: %0.2f ms\n Sink latency: %0.2f ms\n End-to-end latency: %0.2f ms\n"
546 " Deviation from target latency at optimum rate: %0.2f usec\n Average prediction error: ± %0.2f usec\n Optimum rate: %0.2f Hz\n Deviation from base rate: %i Hz",
547 u->source_output->source->name,
548 u->sink_input->sink->name,
549 (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
550 (double) current_buffer_latency / PA_USEC_PER_MSEC,
551 (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
552 (double) current_latency / PA_USEC_PER_MSEC,
553 (double) latency_at_optimum_rate - final_latency,
554 (double) u->latency_error,
555 u->drift_compensation_rate + base_rate,
556 (int32_t)(new_rate - base_rate));
557 u->log_counter = u->log_interval;
558 }
559 }
560
561 /* If the latency difference changed sign, we have crossed the target latency. */
562 if ((int64_t)latency_difference * u->last_latency_difference < 0)
563 u->target_latency_cross_counter++;
564
565 /* Save current latency difference at new rate for next cycle and reset flags */
566 u->last_latency_difference = current_source_sink_latency + current_buffer_latency * old_rate / new_rate - final_latency;
567
568 /* Set variables that may change between calls of adjust_rate() */
569 u->source_sink_changed = false;
570 u->underrun_occured = false;
571 u->last_source_latency_offset = u->source_latency_offset;
572 u->last_sink_latency_offset = u->sink_latency_offset;
573 u->source_latency_offset_changed = false;
574 u->sink_latency_offset_changed = false;
575
576 /* Predicton of next latency */
577
578 /* Evaluate optimum rate */
579 base_rate_with_drift = u->drift_compensation_rate + base_rate;
580
581 /* Latency correction on next iteration */
582 latency_correction = (base_rate_with_drift - new_rate) * (int64_t)u->real_adjust_time / new_rate;
583
584 if ((int)new_rate != (int)base_rate_with_drift || new_rate != old_rate) {
585 /* While we are correcting, the next latency is determined by the current value and the difference
586 * between the new sampling rate and the base rate*/
587 u->next_latency_with_drift = current_latency + latency_correction + ((double)old_rate / new_rate - 1) * current_buffer_latency;
588 u->next_latency_at_optimum_rate_with_drift = filtered_latency + latency_correction * new_rate / base_rate_with_drift;
589
590 } else {
591 /* We are in steady state, now only the fractional drift should matter.
592 * To make sure that we do not drift away due to errors in the fractional
593 * drift, use a running average of the measured and predicted values */
594 u->next_latency_with_drift = (filtered_latency + u->next_latency_with_drift) / 2.0 + (1.0 - (double)(int)base_rate_with_drift / base_rate_with_drift) * (int64_t)u->real_adjust_time;
595
596 /* We are at the optimum rate, so nothing to correct */
597 u->next_latency_at_optimum_rate_with_drift = u->next_latency_with_drift;
598 }
599
600 /* Set rate */
601 pa_sink_input_set_rate(u->sink_input, new_rate);
602 }
603
604 /* Called from main context */
time_callback(pa_mainloop_api * a,pa_time_event * e,const struct timeval * t,void * userdata)605 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
606 struct userdata *u = userdata;
607
608 pa_assert(u);
609 pa_assert(a);
610 pa_assert(u->time_event == e);
611
612 /* Restart timer right away */
613 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
614
615 /* If the initial latency adjustment has not been done yet, we have to skip
616 * adjust_rates(). The estimation of the optimum rate cannot be done in that
617 * situation */
618 if (u->initial_adjust_pending)
619 return;
620
621 /* Get sink and source latency snapshot */
622 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
623 pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
624
625 adjust_rates(u);
626 }
627
628 /* Called from main context
629 * When source or sink changes, give it a third of a second to settle down, then call adjust_rates for the first time */
enable_adjust_timer(struct userdata * u,bool enable)630 static void enable_adjust_timer(struct userdata *u, bool enable) {
631 if (enable) {
632 if (!u->adjust_time)
633 return;
634 if (u->time_event)
635 u->core->mainloop->time_free(u->time_event);
636
637 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + 333 * PA_USEC_PER_MSEC, time_callback, u);
638 } else {
639 if (!u->time_event)
640 return;
641
642 u->core->mainloop->time_free(u->time_event);
643 u->time_event = NULL;
644 }
645 }
646
647 /* Called from main context */
update_adjust_timer(struct userdata * u)648 static void update_adjust_timer(struct userdata *u) {
649 if (u->sink_input->state == PA_SINK_INPUT_CORKED || u->source_output->state == PA_SOURCE_OUTPUT_CORKED)
650 enable_adjust_timer(u, false);
651 else
652 enable_adjust_timer(u, true);
653 }
654
655 /* Called from main thread
656 * Calculates minimum and maximum possible latency for source and sink */
update_latency_boundaries(struct userdata * u,pa_source * source,pa_sink * sink)657 static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_sink *sink) {
658 const char *s;
659
660 if (source) {
661 /* Source latencies */
662 u->fixed_alsa_source = false;
663 if (source->flags & PA_SOURCE_DYNAMIC_LATENCY)
664 pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency);
665 else {
666 u->min_source_latency = pa_source_get_fixed_latency(source);
667 u->max_source_latency = u->min_source_latency;
668 if ((s = pa_proplist_gets(source->proplist, PA_PROP_DEVICE_API))) {
669 if (pa_streq(s, "alsa"))
670 u->fixed_alsa_source = true;
671 }
672 }
673 /* Source offset */
674 u->source_latency_offset = source->port_latency_offset;
675
676 /* Latencies below 2.5 ms cause problems, limit source latency if possible */
677 if (u->max_source_latency >= MIN_DEVICE_LATENCY)
678 u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY);
679 else
680 u->min_source_latency = u->max_source_latency;
681 }
682
683 if (sink) {
684 /* Sink latencies */
685 if (sink->flags & PA_SINK_DYNAMIC_LATENCY)
686 pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency);
687 else {
688 u->min_sink_latency = pa_sink_get_fixed_latency(sink);
689 u->max_sink_latency = u->min_sink_latency;
690 }
691 /* Sink offset */
692 u->sink_latency_offset = sink->port_latency_offset;
693
694 /* Latencies below 2.5 ms cause problems, limit sink latency if possible */
695 if (u->max_sink_latency >= MIN_DEVICE_LATENCY)
696 u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY);
697 else
698 u->min_sink_latency = u->max_sink_latency;
699 }
700
701 update_minimum_latency(u, sink, true);
702 }
703
704 /* Called from output context
705 * Sets the memblockq to the configured latency corrected by latency_offset_usec */
memblockq_adjust(struct userdata * u,int64_t latency_offset_usec,bool allow_push)706 static void memblockq_adjust(struct userdata *u, int64_t latency_offset_usec, bool allow_push) {
707 size_t current_memblockq_length, requested_memblockq_length, buffer_correction;
708 int64_t requested_buffer_latency;
709 pa_usec_t final_latency, requested_sink_latency;
710
711 final_latency = PA_MAX(u->latency, u->output_thread_info.minimum_latency);
712
713 /* If source or sink have some large negative latency offset, we might want to
714 * hold more than final_latency in the memblockq */
715 requested_buffer_latency = (int64_t)final_latency - latency_offset_usec;
716
717 /* Keep at least one sink latency in the queue to make sure that the sink
718 * never underruns initially */
719 requested_sink_latency = pa_sink_get_requested_latency_within_thread(u->sink_input->sink);
720 if (requested_buffer_latency < (int64_t)requested_sink_latency)
721 requested_buffer_latency = requested_sink_latency;
722
723 requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec);
724 current_memblockq_length = pa_memblockq_get_length(u->memblockq);
725
726 if (current_memblockq_length > requested_memblockq_length) {
727 /* Drop audio from queue */
728 buffer_correction = current_memblockq_length - requested_memblockq_length;
729 pa_log_info("Dropping %" PRIu64 " usec of audio from queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
730 pa_memblockq_drop(u->memblockq, buffer_correction);
731
732 } else if (current_memblockq_length < requested_memblockq_length && allow_push) {
733 /* Add silence to queue */
734 buffer_correction = requested_memblockq_length - current_memblockq_length;
735 pa_log_info("Adding %" PRIu64 " usec of silence to queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
736 pa_memblockq_seek(u->memblockq, (int64_t)buffer_correction, PA_SEEK_RELATIVE, true);
737 }
738 }
739
740 /* Called from input thread context */
source_output_push_cb(pa_source_output * o,const pa_memchunk * chunk)741 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
742 struct userdata *u;
743 pa_usec_t push_time;
744 int64_t current_source_latency;
745
746 pa_source_output_assert_ref(o);
747 pa_source_output_assert_io_context(o);
748 pa_assert_se(u = o->userdata);
749
750 /* Send current source latency and timestamp with the message */
751 push_time = pa_rtclock_now();
752 current_source_latency = pa_source_get_latency_within_thread(u->source_output->source, true);
753 current_source_latency += pa_resampler_get_delay_usec(u->source_output->thread_info.resampler);
754
755 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_INT_TO_PTR(current_source_latency), push_time, chunk, NULL);
756 u->send_counter += (int64_t) chunk->length;
757 }
758
759 /* Called from input thread context */
source_output_process_rewind_cb(pa_source_output * o,size_t nbytes)760 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
761 struct userdata *u;
762
763 pa_source_output_assert_ref(o);
764 pa_source_output_assert_io_context(o);
765 pa_assert_se(u = o->userdata);
766
767 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
768 u->send_counter -= (int64_t) nbytes;
769 }
770
771 /* Called from input thread context */
source_output_process_msg_cb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)772 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
773 struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
774
775 switch (code) {
776
777 case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
778 size_t length;
779
780 length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
781
782 u->latency_snapshot.send_counter = u->send_counter;
783 /* Add content of delay memblockq to the source latency */
784 u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, true) +
785 pa_bytes_to_usec(length, &u->source_output->source->sample_spec);
786 /* Add resampler latency */
787 u->latency_snapshot.source_latency += pa_resampler_get_delay_usec(u->source_output->thread_info.resampler);
788
789 u->latency_snapshot.source_timestamp = pa_rtclock_now();
790
791 return 0;
792 }
793 }
794
795 return pa_source_output_process_msg(obj, code, data, offset, chunk);
796 }
797
798 /* Called from main thread.
799 * Get current effective latency of the source. If the source is in use with
800 * smaller latency than the configured latency, it will continue running with
801 * the smaller value when the source output is switched to the source. */
update_effective_source_latency(struct userdata * u,pa_source * source,pa_sink * sink)802 static void update_effective_source_latency(struct userdata *u, pa_source *source, pa_sink *sink) {
803 pa_usec_t effective_source_latency;
804
805 effective_source_latency = u->configured_source_latency;
806
807 if (source) {
808 effective_source_latency = pa_source_get_requested_latency(source);
809 if (effective_source_latency == 0 || effective_source_latency > u->configured_source_latency)
810 effective_source_latency = u->configured_source_latency;
811 }
812
813 /* If the sink is valid, send a message to the output thread, else set the variable directly */
814 if (sink)
815 pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effective_source_latency, NULL);
816 else
817 u->output_thread_info.effective_source_latency = effective_source_latency;
818 }
819
820 /* Called from main thread.
821 * Set source output latency to one third of the overall latency if possible.
822 * The choice of one third is rather arbitrary somewhere between the minimum
823 * possible latency which would cause a lot of CPU load and half the configured
824 * latency which would quickly lead to underruns */
set_source_output_latency(struct userdata * u,pa_source * source)825 static void set_source_output_latency(struct userdata *u, pa_source *source) {
826 pa_usec_t latency, requested_latency;
827
828 requested_latency = u->latency / 3;
829
830 /* Normally we try to configure sink and source latency equally. If the
831 * sink latency cannot match the requested source latency try to set the
832 * source latency to a smaller value to avoid underruns */
833 if (u->min_sink_latency > requested_latency) {
834 latency = PA_MAX(u->latency, u->minimum_latency);
835 requested_latency = (latency - u->min_sink_latency) / 2;
836 }
837
838 latency = PA_CLAMP(requested_latency , u->min_source_latency, u->max_source_latency);
839 u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency);
840 if (u->configured_source_latency != requested_latency)
841 pa_log_warn("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC);
842 }
843
844 /* Called from input thread context */
source_output_attach_cb(pa_source_output * o)845 static void source_output_attach_cb(pa_source_output *o) {
846 struct userdata *u;
847
848 pa_source_output_assert_ref(o);
849 pa_source_output_assert_io_context(o);
850 pa_assert_se(u = o->userdata);
851
852 u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
853 o->source->thread_info.rtpoll,
854 PA_RTPOLL_LATE,
855 u->asyncmsgq);
856 }
857
858 /* Called from input thread context */
source_output_detach_cb(pa_source_output * o)859 static void source_output_detach_cb(pa_source_output *o) {
860 struct userdata *u;
861
862 pa_source_output_assert_ref(o);
863 pa_source_output_assert_io_context(o);
864 pa_assert_se(u = o->userdata);
865
866 if (u->rtpoll_item_write) {
867 pa_rtpoll_item_free(u->rtpoll_item_write);
868 u->rtpoll_item_write = NULL;
869 }
870 }
871
872 /* Called from main thread */
source_output_kill_cb(pa_source_output * o)873 static void source_output_kill_cb(pa_source_output *o) {
874 struct userdata *u;
875
876 pa_source_output_assert_ref(o);
877 pa_assert_ctl_context();
878 pa_assert_se(u = o->userdata);
879
880 teardown(u);
881 pa_module_unload_request(u->module, true);
882 }
883
884 /* Called from main thread */
source_output_may_move_to_cb(pa_source_output * o,pa_source * dest)885 static bool source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
886 struct userdata *u;
887
888 pa_source_output_assert_ref(o);
889 pa_assert_ctl_context();
890 pa_assert_se(u = o->userdata);
891
892 if (!u->sink_input || !u->sink_input->sink)
893 return true;
894
895 return dest != u->sink_input->sink->monitor_source;
896 }
897
898 /* Called from main thread */
source_output_moving_cb(pa_source_output * o,pa_source * dest)899 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
900 struct userdata *u;
901 char *input_description;
902 const char *n;
903
904 if (!dest)
905 return;
906
907 pa_source_output_assert_ref(o);
908 pa_assert_ctl_context();
909 pa_assert_se(u = o->userdata);
910
911 input_description = pa_sprintf_malloc("Loopback of %s",
912 pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
913 pa_sink_input_set_property(u->sink_input, PA_PROP_MEDIA_NAME, input_description);
914 pa_xfree(input_description);
915
916 if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
917 pa_sink_input_set_property(u->sink_input, PA_PROP_DEVICE_ICON_NAME, n);
918
919 /* Set latency and calculate latency limits */
920 u->underrun_latency_limit = 0;
921 u->last_source_latency_offset = dest->port_latency_offset;
922 u->initial_adjust_pending = true;
923 update_latency_boundaries(u, dest, u->sink_input->sink);
924 set_source_output_latency(u, dest);
925 update_effective_source_latency(u, dest, u->sink_input->sink);
926
927 /* Uncork the sink input unless the destination is suspended for other
928 * reasons than idle. */
929 if (dest->state == PA_SOURCE_SUSPENDED)
930 pa_sink_input_cork(u->sink_input, (dest->suspend_cause != PA_SUSPEND_IDLE));
931 else
932 pa_sink_input_cork(u->sink_input, false);
933
934 update_adjust_timer(u);
935
936 /* Reset counters */
937 u->iteration_counter = 0;
938 u->underrun_counter = 0;
939
940 /* Reset booleans, latency error and counters */
941 u->source_sink_changed = true;
942 u->underrun_occured = false;
943 u->source_latency_offset_changed = false;
944 u->target_latency_cross_counter = 0;
945 u->log_counter = u->log_interval;
946 u->latency_error = 0;
947
948 /* Send a mesage to the output thread that the source has changed.
949 * If the sink is invalid here during a profile switching situation
950 * we can safely set push_called to false directly. */
951 if (u->sink_input->sink)
952 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
953 else
954 u->output_thread_info.push_called = false;
955
956 /* The sampling rate may be far away from the default rate if we are still
957 * recovering from a previous source or sink change, so reset rate to
958 * default before moving the source. */
959 pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
960 }
961
962 /* Called from main thread */
source_output_suspend_cb(pa_source_output * o,pa_source_state_t old_state,pa_suspend_cause_t old_suspend_cause)963 static void source_output_suspend_cb(pa_source_output *o, pa_source_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
964 struct userdata *u;
965 bool suspended;
966
967 pa_source_output_assert_ref(o);
968 pa_assert_ctl_context();
969 pa_assert_se(u = o->userdata);
970
971 /* State has not changed, nothing to do */
972 if (old_state == o->source->state)
973 return;
974
975 suspended = (o->source->state == PA_SOURCE_SUSPENDED);
976
977 /* If the source has been suspended, we need to handle this like
978 * a source change when the source is resumed */
979 if (suspended) {
980 if (u->sink_input->sink)
981 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
982 else
983 u->output_thread_info.push_called = false;
984
985 } else
986 /* Get effective source latency on unsuspend */
987 update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
988
989 pa_sink_input_cork(u->sink_input, suspended);
990
991 update_adjust_timer(u);
992 }
993
994 /* Called from input thread context */
update_source_latency_range_cb(pa_source_output * i)995 static void update_source_latency_range_cb(pa_source_output *i) {
996 struct userdata *u;
997
998 pa_source_output_assert_ref(i);
999 pa_source_output_assert_io_context(i);
1000 pa_assert_se(u = i->userdata);
1001
1002 /* Source latency may have changed */
1003 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1004 }
1005
1006 /* Called from output thread context */
sink_input_pop_cb(pa_sink_input * i,size_t nbytes,pa_memchunk * chunk)1007 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1008 struct userdata *u;
1009
1010 pa_sink_input_assert_ref(i);
1011 pa_sink_input_assert_io_context(i);
1012 pa_assert_se(u = i->userdata);
1013 pa_assert(chunk);
1014
1015 /* It seems necessary to handle outstanding push messages here, though it is not clear
1016 * why. Removing this part leads to underruns when low latencies are configured. */
1017 u->output_thread_info.in_pop = true;
1018 while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
1019 ;
1020 u->output_thread_info.in_pop = false;
1021
1022 /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are
1023 * enabled. Disable them on second pop and enable the final adjustment during the
1024 * next push. The adjustment must be done on the next push, because there is no way
1025 * to retrieve the source latency here. We are waiting for the second pop, because
1026 * the first pop may be called before the sink is actually started. */
1027 if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) {
1028 u->output_thread_info.pop_adjust = true;
1029 u->output_thread_info.pop_called = true;
1030 }
1031 u->output_thread_info.first_pop_done = true;
1032
1033 if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
1034 pa_log_info("Could not peek into queue");
1035 return -1;
1036 }
1037
1038 chunk->length = PA_MIN(chunk->length, nbytes);
1039 pa_memblockq_drop(u->memblockq, chunk->length);
1040
1041 /* Adjust the memblockq to ensure that there is
1042 * enough data in the queue to avoid underruns. */
1043 if (!u->output_thread_info.push_called)
1044 memblockq_adjust(u, 0, true);
1045
1046 return 0;
1047 }
1048
1049 /* Called from output thread context */
sink_input_process_rewind_cb(pa_sink_input * i,size_t nbytes)1050 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1051 struct userdata *u;
1052
1053 pa_sink_input_assert_ref(i);
1054 pa_sink_input_assert_io_context(i);
1055 pa_assert_se(u = i->userdata);
1056
1057 pa_memblockq_rewind(u->memblockq, nbytes);
1058 }
1059
1060 /* Called from output thread context */
sink_input_process_msg_cb(pa_msgobject * obj,int code,void * data,int64_t offset,pa_memchunk * chunk)1061 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1062 struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1063
1064 pa_sink_input_assert_io_context(u->sink_input);
1065
1066 switch (code) {
1067
1068 case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1069 pa_usec_t *r = data;
1070
1071 *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
1072
1073 /* Fall through, the default handler will add in the extra
1074 * latency added by the resampler */
1075 break;
1076 }
1077
1078 case SINK_INPUT_MESSAGE_POST:
1079
1080 pa_memblockq_push_align(u->memblockq, chunk);
1081
1082 /* If push has not been called yet, latency adjustments in sink_input_pop_cb()
1083 * are enabled. Disable them on first push and correct the memblockq. If pop
1084 * has not been called yet, wait until the pop_cb() requests the adjustment */
1085 if (u->output_thread_info.pop_called && (!u->output_thread_info.push_called || u->output_thread_info.pop_adjust)) {
1086 int64_t time_delta;
1087
1088 /* This is the source latency at the time push was called */
1089 time_delta = PA_PTR_TO_INT(data);
1090 /* Add the time between push and post */
1091 time_delta += pa_rtclock_now() - (pa_usec_t) offset;
1092 /* Add the sink and resampler latency */
1093 time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink, true);
1094 time_delta += pa_resampler_get_delay_usec(u->sink_input->thread_info.resampler);
1095
1096 /* The source latency report includes the audio in the chunk,
1097 * but since we already pushed the chunk to the memblockq, we need
1098 * to subtract the chunk size from the source latency so that it
1099 * won't be counted towards both the memblockq latency and the
1100 * source latency.
1101 *
1102 * Sometimes the alsa source reports way too low latency (might
1103 * be a bug in the alsa source code). This seems to happen when
1104 * there's an overrun. As an attempt to detect overruns, we
1105 * check if the chunk size is larger than the configured source
1106 * latency. If so, we assume that the source should have pushed
1107 * a chunk whose size equals the configured latency, so we
1108 * modify time_delta only by that amount, which makes
1109 * memblockq_adjust() drop more data than it would otherwise.
1110 * This seems to work quite well, but it's possible that the
1111 * next push also contains too much data, and in that case the
1112 * resulting latency will be wrong. */
1113 if (pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency)
1114 time_delta -= (int64_t)u->output_thread_info.effective_source_latency;
1115 else
1116 time_delta -= (int64_t)pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec);
1117
1118 /* FIXME: We allow pushing silence here to fix up the latency. This
1119 * might lead to a gap in the stream */
1120 memblockq_adjust(u, time_delta, true);
1121
1122 /* Notify main thread when the initial adjustment is done. */
1123 if (u->output_thread_info.pop_called)
1124 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_ADJUST_DONE, NULL, 0, NULL, NULL);
1125
1126 u->output_thread_info.pop_adjust = false;
1127 u->output_thread_info.push_called = true;
1128 }
1129
1130 /* If pop has not been called yet, make sure the latency does not grow too much.
1131 * Don't push any silence here, because we already have new data in the queue */
1132 if (!u->output_thread_info.pop_called)
1133 memblockq_adjust(u, 0, false);
1134
1135 /* Is this the end of an underrun? Then let's start things
1136 * right-away */
1137 if (u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED &&
1138 u->sink_input->thread_info.underrun_for > 0 &&
1139 pa_memblockq_is_readable(u->memblockq) &&
1140 u->output_thread_info.pop_called) {
1141
1142 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_UNDERRUN, NULL, 0, NULL, NULL);
1143 /* If called from within the pop callback skip the rewind */
1144 if (!u->output_thread_info.in_pop) {
1145 pa_log_debug("Requesting rewind due to end of underrun.");
1146 pa_sink_input_request_rewind(u->sink_input,
1147 (size_t) (u->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : u->sink_input->thread_info.underrun_for),
1148 false, true, false);
1149 }
1150 }
1151
1152 u->output_thread_info.recv_counter += (int64_t) chunk->length;
1153
1154 return 0;
1155
1156 case SINK_INPUT_MESSAGE_REWIND:
1157
1158 /* Do not try to rewind if no data was pushed yet */
1159 if (u->output_thread_info.push_called)
1160 pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
1161
1162 u->output_thread_info.recv_counter -= offset;
1163
1164 return 0;
1165
1166 case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1167 size_t length;
1168
1169 length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1170
1171 u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
1172 u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
1173 /* Add content of render memblockq to sink latency */
1174 u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
1175 pa_bytes_to_usec(length, &u->sink_input->sink->sample_spec);
1176 /* Add resampler latency */
1177 u->latency_snapshot.sink_latency += pa_resampler_get_delay_usec(u->sink_input->thread_info.resampler);
1178
1179 u->latency_snapshot.sink_timestamp = pa_rtclock_now();
1180
1181 return 0;
1182 }
1183
1184 case SINK_INPUT_MESSAGE_SOURCE_CHANGED:
1185
1186 u->output_thread_info.push_called = false;
1187
1188 return 0;
1189
1190 case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY:
1191
1192 u->output_thread_info.effective_source_latency = (pa_usec_t)offset;
1193
1194 return 0;
1195
1196 case SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY:
1197
1198 u->output_thread_info.minimum_latency = (pa_usec_t)offset;
1199
1200 return 0;
1201
1202 case SINK_INPUT_MESSAGE_FAST_ADJUST:
1203
1204 memblockq_adjust(u, offset, true);
1205
1206 return 0;
1207 }
1208
1209 return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1210 }
1211 /* Called from main thread.
1212 * Set sink input latency to one third of the overall latency if possible.
1213 * The choice of one third is rather arbitrary somewhere between the minimum
1214 * possible latency which would cause a lot of CPU load and half the configured
1215 * latency which would quickly lead to underruns. */
set_sink_input_latency(struct userdata * u,pa_sink * sink)1216 static void set_sink_input_latency(struct userdata *u, pa_sink *sink) {
1217 pa_usec_t latency, requested_latency;
1218
1219 requested_latency = u->latency / 3;
1220
1221 /* Normally we try to configure sink and source latency equally. If the
1222 * source latency cannot match the requested sink latency try to set the
1223 * sink latency to a smaller value to avoid underruns */
1224 if (u->min_source_latency > requested_latency) {
1225 latency = PA_MAX(u->latency, u->minimum_latency);
1226 requested_latency = (latency - u->min_source_latency) / 2;
1227 /* In the case of a fixed alsa source, u->minimum_latency is calculated from
1228 * the default fragment size while u->min_source_latency is the reported minimum
1229 * of the source latency (nr_of_fragments * fragment_size). This can lead to a
1230 * situation where u->minimum_latency < u->min_source_latency. We only fall
1231 * back to use the fragment size instead of min_source_latency if the calculation
1232 * above does not deliver a usable result. */
1233 if (u->fixed_alsa_source && u->min_source_latency >= latency)
1234 requested_latency = (latency - u->core->default_fragment_size_msec * PA_USEC_PER_MSEC) / 2;
1235 }
1236
1237 latency = PA_CLAMP(requested_latency , u->min_sink_latency, u->max_sink_latency);
1238 u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency);
1239 if (u->configured_sink_latency != requested_latency)
1240 pa_log_warn("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC);
1241 }
1242
1243 /* Called from output thread context */
sink_input_attach_cb(pa_sink_input * i)1244 static void sink_input_attach_cb(pa_sink_input *i) {
1245 struct userdata *u;
1246
1247 pa_sink_input_assert_ref(i);
1248 pa_sink_input_assert_io_context(i);
1249 pa_assert_se(u = i->userdata);
1250
1251 u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1252 i->sink->thread_info.rtpoll,
1253 PA_RTPOLL_LATE,
1254 u->asyncmsgq);
1255
1256 pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2);
1257 pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
1258 }
1259
1260 /* Called from output thread context */
sink_input_detach_cb(pa_sink_input * i)1261 static void sink_input_detach_cb(pa_sink_input *i) {
1262 struct userdata *u;
1263
1264 pa_sink_input_assert_ref(i);
1265 pa_sink_input_assert_io_context(i);
1266 pa_assert_se(u = i->userdata);
1267
1268 if (u->rtpoll_item_read) {
1269 pa_rtpoll_item_free(u->rtpoll_item_read);
1270 u->rtpoll_item_read = NULL;
1271 }
1272 }
1273
1274 /* Called from output thread context */
sink_input_update_max_rewind_cb(pa_sink_input * i,size_t nbytes)1275 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1276 struct userdata *u;
1277
1278 pa_sink_input_assert_ref(i);
1279 pa_sink_input_assert_io_context(i);
1280 pa_assert_se(u = i->userdata);
1281
1282 pa_memblockq_set_maxrewind(u->memblockq, nbytes);
1283 }
1284
1285 /* Called from output thread context */
sink_input_update_max_request_cb(pa_sink_input * i,size_t nbytes)1286 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1287 struct userdata *u;
1288
1289 pa_sink_input_assert_ref(i);
1290 pa_sink_input_assert_io_context(i);
1291 pa_assert_se(u = i->userdata);
1292
1293 pa_memblockq_set_prebuf(u->memblockq, nbytes*2);
1294 pa_log_info("Max request changed");
1295 }
1296
1297 /* Called from main thread */
sink_input_kill_cb(pa_sink_input * i)1298 static void sink_input_kill_cb(pa_sink_input *i) {
1299 struct userdata *u;
1300
1301 pa_sink_input_assert_ref(i);
1302 pa_assert_ctl_context();
1303 pa_assert_se(u = i->userdata);
1304
1305 teardown(u);
1306 pa_module_unload_request(u->module, true);
1307 }
1308
1309 /* Called from the output thread context */
sink_input_state_change_cb(pa_sink_input * i,pa_sink_input_state_t state)1310 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1311 struct userdata *u;
1312
1313 pa_sink_input_assert_ref(i);
1314 pa_assert_se(u = i->userdata);
1315
1316 if (state == PA_SINK_INPUT_UNLINKED)
1317 pa_asyncmsgq_flush(u->asyncmsgq, false);
1318 }
1319
1320 /* Called from main thread */
sink_input_moving_cb(pa_sink_input * i,pa_sink * dest)1321 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1322 struct userdata *u;
1323 char *output_description;
1324 const char *n;
1325
1326 if (!dest)
1327 return;
1328
1329 pa_sink_input_assert_ref(i);
1330 pa_assert_ctl_context();
1331 pa_assert_se(u = i->userdata);
1332
1333 output_description = pa_sprintf_malloc("Loopback to %s",
1334 pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1335 pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_NAME, output_description);
1336 pa_xfree(output_description);
1337
1338 if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
1339 pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_ICON_NAME, n);
1340
1341 /* Set latency and calculate latency limits */
1342 u->underrun_latency_limit = 0;
1343 u->last_sink_latency_offset = dest->port_latency_offset;
1344 u->initial_adjust_pending = true;
1345 update_latency_boundaries(u, NULL, dest);
1346 set_sink_input_latency(u, dest);
1347 update_effective_source_latency(u, u->source_output->source, dest);
1348
1349 /* Uncork the source output unless the destination is suspended for other
1350 * reasons than idle */
1351 if (dest->state == PA_SINK_SUSPENDED)
1352 pa_source_output_cork(u->source_output, (dest->suspend_cause != PA_SUSPEND_IDLE));
1353 else
1354 pa_source_output_cork(u->source_output, false);
1355
1356 update_adjust_timer(u);
1357
1358 /* Reset counters */
1359 u->iteration_counter = 0;
1360 u->underrun_counter = 0;
1361
1362 /* Reset booleans, latency error and counters */
1363 u->source_sink_changed = true;
1364 u->underrun_occured = false;
1365 u->sink_latency_offset_changed = false;
1366 u->target_latency_cross_counter = 0;
1367 u->log_counter = u->log_interval;
1368 u->latency_error = 0;
1369
1370 u->output_thread_info.pop_called = false;
1371 u->output_thread_info.first_pop_done = false;
1372
1373 /* Sample rate may be far away from the default rate if we are still
1374 * recovering from a previous source or sink change, so reset rate to
1375 * default before moving the sink. */
1376 pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
1377 }
1378
1379 /* Called from main thread */
sink_input_may_move_to_cb(pa_sink_input * i,pa_sink * dest)1380 static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1381 struct userdata *u;
1382
1383 pa_sink_input_assert_ref(i);
1384 pa_assert_ctl_context();
1385 pa_assert_se(u = i->userdata);
1386
1387 if (!u->source_output || !u->source_output->source)
1388 return true;
1389
1390 return dest != u->source_output->source->monitor_of;
1391 }
1392
1393 /* Called from main thread */
sink_input_suspend_cb(pa_sink_input * i,pa_sink_state_t old_state,pa_suspend_cause_t old_suspend_cause)1394 static void sink_input_suspend_cb(pa_sink_input *i, pa_sink_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
1395 struct userdata *u;
1396 bool suspended;
1397
1398 pa_sink_input_assert_ref(i);
1399 pa_assert_ctl_context();
1400 pa_assert_se(u = i->userdata);
1401
1402 /* State has not changed, nothing to do */
1403 if (old_state == i->sink->state)
1404 return;
1405
1406 suspended = (i->sink->state == PA_SINK_SUSPENDED);
1407
1408 /* If the sink has been suspended, we need to handle this like
1409 * a sink change when the sink is resumed. Because the sink
1410 * is suspended, we can set the variables directly. */
1411 if (suspended) {
1412 u->output_thread_info.pop_called = false;
1413 u->output_thread_info.first_pop_done = false;
1414
1415 } else
1416 /* Set effective source latency on unsuspend */
1417 update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1418
1419 pa_source_output_cork(u->source_output, suspended);
1420
1421 update_adjust_timer(u);
1422 }
1423
1424 /* Called from output thread context */
update_sink_latency_range_cb(pa_sink_input * i)1425 static void update_sink_latency_range_cb(pa_sink_input *i) {
1426 struct userdata *u;
1427
1428 pa_sink_input_assert_ref(i);
1429 pa_sink_input_assert_io_context(i);
1430 pa_assert_se(u = i->userdata);
1431
1432 /* Sink latency may have changed */
1433 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1434 }
1435
1436 /* Called from main context */
loopback_process_msg_cb(pa_msgobject * o,int code,void * userdata,int64_t offset,pa_memchunk * chunk)1437 static int loopback_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1438 struct loopback_msg *msg;
1439 struct userdata *u;
1440 pa_usec_t current_latency;
1441
1442 pa_assert(o);
1443 pa_assert_ctl_context();
1444
1445 msg = LOOPBACK_MSG(o);
1446
1447 /* If messages are processed after a module unload request, they
1448 * must be ignored. */
1449 if (msg->dead)
1450 return 0;
1451
1452 pa_assert_se(u = msg->userdata);
1453
1454 switch (code) {
1455
1456 case LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED:
1457
1458 update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1459 current_latency = pa_source_get_requested_latency(u->source_output->source);
1460 if (current_latency > u->configured_source_latency) {
1461 /* The minimum latency has changed to a value larger than the configured latency, so
1462 * the source latency has been increased. The case that the minimum latency changes
1463 * back to a smaller value is not handled because this never happens with the current
1464 * source implementations. */
1465 pa_log_warn("Source minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1466 u->configured_source_latency = current_latency;
1467 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1468 /* We re-start counting when the latency has changed */
1469 u->iteration_counter = 0;
1470 u->underrun_counter = 0;
1471 }
1472
1473 return 0;
1474
1475 case LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED:
1476
1477 current_latency = pa_sink_get_requested_latency(u->sink_input->sink);
1478 if (current_latency > u->configured_sink_latency) {
1479 /* The minimum latency has changed to a value larger than the configured latency, so
1480 * the sink latency has been increased. The case that the minimum latency changes back
1481 * to a smaller value is not handled because this never happens with the current sink
1482 * implementations. */
1483 pa_log_warn("Sink minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1484 u->configured_sink_latency = current_latency;
1485 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1486 /* We re-start counting when the latency has changed */
1487 u->iteration_counter = 0;
1488 u->underrun_counter = 0;
1489 }
1490
1491 return 0;
1492
1493 case LOOPBACK_MESSAGE_UNDERRUN:
1494
1495 u->underrun_counter++;
1496 u->underrun_occured = true;
1497 u->target_latency_cross_counter = 0;
1498 pa_log_debug("Underrun detected, counter incremented to %u", u->underrun_counter);
1499
1500 return 0;
1501
1502 case LOOPBACK_MESSAGE_ADJUST_DONE:
1503
1504 u->initial_adjust_pending = false;
1505
1506 return 0;
1507
1508 }
1509
1510 return 0;
1511 }
1512
1513 /* Called from main thread */
sink_port_latency_offset_changed_cb(pa_core * core,pa_sink * sink,struct userdata * u)1514 static pa_hook_result_t sink_port_latency_offset_changed_cb(pa_core *core, pa_sink *sink, struct userdata *u) {
1515
1516 if (sink != u->sink_input->sink)
1517 return PA_HOOK_OK;
1518
1519 if (!u->sink_latency_offset_changed)
1520 u->last_sink_latency_offset = u->sink_latency_offset;
1521 u->sink_latency_offset_changed = true;
1522 u->sink_latency_offset = sink->port_latency_offset;
1523 update_minimum_latency(u, sink, true);
1524
1525 /* We might need to adjust again, reset counter */
1526 u->target_latency_cross_counter = 0;
1527
1528 return PA_HOOK_OK;
1529 }
1530
1531 /* Called from main thread */
source_port_latency_offset_changed_cb(pa_core * core,pa_source * source,struct userdata * u)1532 static pa_hook_result_t source_port_latency_offset_changed_cb(pa_core *core, pa_source *source, struct userdata *u) {
1533
1534 if (source != u->source_output->source)
1535 return PA_HOOK_OK;
1536
1537 if (!u->source_latency_offset_changed)
1538 u->last_source_latency_offset = u->source_latency_offset;
1539 u->source_latency_offset_changed = true;
1540 u->source_latency_offset = source->port_latency_offset;
1541 update_minimum_latency(u, u->sink_input->sink, true);
1542
1543 /* We might need to adjust again, reset counter */
1544 u->target_latency_cross_counter = 0;
1545
1546 return PA_HOOK_OK;
1547 }
1548
pa__init(pa_module * m)1549 int pa__init(pa_module *m) {
1550 pa_modargs *ma = NULL;
1551 struct userdata *u;
1552 pa_sink *sink = NULL;
1553 pa_sink_input_new_data sink_input_data;
1554 bool sink_dont_move;
1555 pa_source *source = NULL;
1556 pa_source_output_new_data source_output_data;
1557 bool source_dont_move;
1558 uint32_t latency_msec;
1559 uint32_t max_latency_msec;
1560 uint32_t fast_adjust_threshold;
1561 uint32_t adjust_threshold;
1562 pa_sample_spec ss;
1563 pa_channel_map map;
1564 bool format_set = false;
1565 bool rate_set = false;
1566 bool channels_set = false;
1567 pa_memchunk silence;
1568 double adjust_time_sec;
1569 double log_interval_sec;
1570 const char *n;
1571 bool remix = true;
1572
1573 pa_assert(m);
1574
1575 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1576 pa_log("Failed to parse module arguments");
1577 goto fail;
1578 }
1579
1580 n = pa_modargs_get_value(ma, "source", NULL);
1581 if (n && !(source = pa_namereg_get(m->core, n, PA_NAMEREG_SOURCE))) {
1582 pa_log("No such source.");
1583 goto fail;
1584 }
1585
1586 n = pa_modargs_get_value(ma, "sink", NULL);
1587 if (n && !(sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1588 pa_log("No such sink.");
1589 goto fail;
1590 }
1591
1592 if (pa_modargs_get_value_boolean(ma, "remix", &remix) < 0) {
1593 pa_log("Invalid boolean remix parameter");
1594 goto fail;
1595 }
1596
1597 if (source) {
1598 ss = source->sample_spec;
1599 map = source->channel_map;
1600 format_set = true;
1601 rate_set = true;
1602 channels_set = true;
1603 } else if (sink) {
1604 ss = sink->sample_spec;
1605 map = sink->channel_map;
1606 format_set = true;
1607 rate_set = true;
1608 channels_set = true;
1609 } else {
1610 /* FIXME: Dummy stream format, needed because pa_sink_input_new()
1611 * requires valid sample spec and channel map even when all the FIX_*
1612 * stream flags are specified. pa_sink_input_new() should be changed
1613 * to ignore the sample spec and channel map when the FIX_* flags are
1614 * present. */
1615 ss.format = PA_SAMPLE_U8;
1616 ss.rate = 8000;
1617 ss.channels = 1;
1618 map.channels = 1;
1619 map.map[0] = PA_CHANNEL_POSITION_MONO;
1620 }
1621
1622 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1623 pa_log("Invalid sample format specification or channel map");
1624 goto fail;
1625 }
1626
1627 if (ss.rate < 4000 || ss.rate > PA_RATE_MAX) {
1628 pa_log("Invalid rate specification, valid range is 4000 Hz to %i Hz", PA_RATE_MAX);
1629 goto fail;
1630 }
1631
1632 if (pa_modargs_get_value(ma, "format", NULL))
1633 format_set = true;
1634
1635 if (pa_modargs_get_value(ma, "rate", NULL))
1636 rate_set = true;
1637
1638 if (pa_modargs_get_value(ma, "channels", NULL) || pa_modargs_get_value(ma, "channel_map", NULL))
1639 channels_set = true;
1640
1641 adjust_threshold = DEFAULT_ADJUST_THRESHOLD_USEC;
1642 if (pa_modargs_get_value_u32(ma, "adjust_threshold_usec", &adjust_threshold) < 0 || adjust_threshold < 1 || adjust_threshold > 10000) {
1643 pa_log_info("Invalid adjust threshold specification");
1644 goto fail;
1645 }
1646
1647 latency_msec = DEFAULT_LATENCY_MSEC;
1648 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 30000) {
1649 pa_log("Invalid latency specification");
1650 goto fail;
1651 }
1652
1653 fast_adjust_threshold = 0;
1654 if (pa_modargs_get_value_u32(ma, "fast_adjust_threshold_msec", &fast_adjust_threshold) < 0 || (fast_adjust_threshold != 0 && fast_adjust_threshold < 100)) {
1655 pa_log("Invalid fast adjust threshold specification");
1656 goto fail;
1657 }
1658
1659 max_latency_msec = 0;
1660 if (pa_modargs_get_value_u32(ma, "max_latency_msec", &max_latency_msec) < 0) {
1661 pa_log("Invalid maximum latency specification");
1662 goto fail;
1663 }
1664
1665 if (max_latency_msec > 0 && max_latency_msec < latency_msec) {
1666 pa_log_warn("Configured maximum latency is smaller than latency, using latency instead");
1667 max_latency_msec = latency_msec;
1668 }
1669
1670 m->userdata = u = pa_xnew0(struct userdata, 1);
1671 u->core = m->core;
1672 u->module = m;
1673 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
1674 u->max_latency = (pa_usec_t) max_latency_msec * PA_USEC_PER_MSEC;
1675 u->output_thread_info.pop_called = false;
1676 u->output_thread_info.pop_adjust = false;
1677 u->output_thread_info.push_called = false;
1678 u->iteration_counter = 0;
1679 u->underrun_counter = 0;
1680 u->underrun_latency_limit = 0;
1681 u->source_sink_changed = true;
1682 u->real_adjust_time_sum = 0;
1683 u->adjust_counter = 0;
1684 u->fast_adjust_threshold = fast_adjust_threshold * PA_USEC_PER_MSEC;
1685 u->underrun_occured = false;
1686 u->source_latency_offset_changed = false;
1687 u->sink_latency_offset_changed = false;
1688 u->latency_error = 0;
1689 u->adjust_threshold = adjust_threshold;
1690 u->target_latency_cross_counter = 0;
1691 u->initial_adjust_pending = true;
1692
1693 adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1694 if (pa_modargs_get_value_double(ma, "adjust_time", &adjust_time_sec) < 0) {
1695 pa_log("Failed to parse adjust_time value");
1696 goto fail;
1697 }
1698
1699 /* Allow values >= 0.1 and also 0 which means no adjustment */
1700 if (adjust_time_sec < 0.1) {
1701 if (adjust_time_sec < 0 || adjust_time_sec > 0) {
1702 pa_log("Failed to parse adjust_time value");
1703 goto fail;
1704 }
1705 }
1706
1707 u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1708 u->real_adjust_time = u->adjust_time;
1709
1710 pa_source_output_new_data_init(&source_output_data);
1711 source_output_data.driver = __FILE__;
1712 source_output_data.module = m;
1713 if (source)
1714 pa_source_output_new_data_set_source(&source_output_data, source, false, true);
1715
1716 if (pa_modargs_get_proplist(ma, "source_output_properties", source_output_data.proplist, PA_UPDATE_REPLACE) < 0) {
1717 pa_log("Failed to parse the source_output_properties value.");
1718 pa_source_output_new_data_done(&source_output_data);
1719 goto fail;
1720 }
1721
1722 if (!pa_proplist_contains(source_output_data.proplist, PA_PROP_MEDIA_ROLE))
1723 pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1724
1725 pa_source_output_new_data_set_sample_spec(&source_output_data, &ss);
1726 pa_source_output_new_data_set_channel_map(&source_output_data, &map);
1727 source_output_data.flags = PA_SOURCE_OUTPUT_START_CORKED;
1728
1729 if (!remix)
1730 source_output_data.flags |= PA_SOURCE_OUTPUT_NO_REMIX;
1731
1732 if (!format_set)
1733 source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_FORMAT;
1734
1735 if (!rate_set)
1736 source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_RATE;
1737
1738 if (!channels_set)
1739 source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_CHANNELS;
1740
1741 source_dont_move = false;
1742 if (pa_modargs_get_value_boolean(ma, "source_dont_move", &source_dont_move) < 0) {
1743 pa_log("source_dont_move= expects a boolean argument.");
1744 goto fail;
1745 }
1746
1747 if (source_dont_move)
1748 source_output_data.flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
1749
1750 pa_source_output_new(&u->source_output, m->core, &source_output_data);
1751 pa_source_output_new_data_done(&source_output_data);
1752
1753 if (!u->source_output)
1754 goto fail;
1755
1756 u->source_output->parent.process_msg = source_output_process_msg_cb;
1757 u->source_output->push = source_output_push_cb;
1758 u->source_output->process_rewind = source_output_process_rewind_cb;
1759 u->source_output->kill = source_output_kill_cb;
1760 u->source_output->attach = source_output_attach_cb;
1761 u->source_output->detach = source_output_detach_cb;
1762 u->source_output->may_move_to = source_output_may_move_to_cb;
1763 u->source_output->moving = source_output_moving_cb;
1764 u->source_output->suspend = source_output_suspend_cb;
1765 u->source_output->update_source_latency_range = update_source_latency_range_cb;
1766 u->source_output->update_source_fixed_latency = update_source_latency_range_cb;
1767 u->source_output->userdata = u;
1768
1769 /* If format, rate or channels were originally unset, they are set now
1770 * after the pa_source_output_new() call. */
1771 ss = u->source_output->sample_spec;
1772 map = u->source_output->channel_map;
1773
1774 /* Get log interval, default is 0, which means no logging */
1775 log_interval_sec = 0;
1776 if (pa_modargs_get_value_double(ma, "log_interval", &log_interval_sec) < 0) {
1777 pa_log_info("Invalid log interval specification");
1778 goto fail;
1779 }
1780
1781 /* Allow values >= 0.1 and also 0 */
1782 if (log_interval_sec < 0.1) {
1783 if (log_interval_sec < 0 || log_interval_sec > 0) {
1784 pa_log("Failed to parse log_interval value");
1785 goto fail;
1786 }
1787 }
1788
1789 /* Estimate number of iterations for logging. */
1790 u->log_interval = 0;
1791 if (u->adjust_time != 0 && log_interval_sec != 0) {
1792 u->log_interval = (int)(log_interval_sec * PA_USEC_PER_SEC / u->adjust_time + 0.5);
1793 /* Logging was specified, but log interval parameter was too small,
1794 * therefore log on every iteration */
1795 if (u->log_interval == 0)
1796 u->log_interval = 1;
1797 }
1798 u->log_counter = u->log_interval;
1799
1800 pa_sink_input_new_data_init(&sink_input_data);
1801 sink_input_data.driver = __FILE__;
1802 sink_input_data.module = m;
1803
1804 if (sink)
1805 pa_sink_input_new_data_set_sink(&sink_input_data, sink, false, true);
1806
1807 if (pa_modargs_get_proplist(ma, "sink_input_properties", sink_input_data.proplist, PA_UPDATE_REPLACE) < 0) {
1808 pa_log("Failed to parse the sink_input_properties value.");
1809 pa_sink_input_new_data_done(&sink_input_data);
1810 goto fail;
1811 }
1812
1813 if (!pa_proplist_contains(sink_input_data.proplist, PA_PROP_MEDIA_ROLE))
1814 pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1815
1816 pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
1817 pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
1818 sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED;
1819
1820 if (!remix)
1821 sink_input_data.flags |= PA_SINK_INPUT_NO_REMIX;
1822
1823 sink_dont_move = false;
1824 if (pa_modargs_get_value_boolean(ma, "sink_dont_move", &sink_dont_move) < 0) {
1825 pa_log("sink_dont_move= expects a boolean argument.");
1826 goto fail;
1827 }
1828
1829 if (sink_dont_move)
1830 sink_input_data.flags |= PA_SINK_INPUT_DONT_MOVE;
1831
1832 pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1833 pa_sink_input_new_data_done(&sink_input_data);
1834
1835 if (!u->sink_input)
1836 goto fail;
1837
1838 u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1839 u->sink_input->pop = sink_input_pop_cb;
1840 u->sink_input->process_rewind = sink_input_process_rewind_cb;
1841 u->sink_input->kill = sink_input_kill_cb;
1842 u->sink_input->state_change = sink_input_state_change_cb;
1843 u->sink_input->attach = sink_input_attach_cb;
1844 u->sink_input->detach = sink_input_detach_cb;
1845 u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1846 u->sink_input->update_max_request = sink_input_update_max_request_cb;
1847 u->sink_input->may_move_to = sink_input_may_move_to_cb;
1848 u->sink_input->moving = sink_input_moving_cb;
1849 u->sink_input->suspend = sink_input_suspend_cb;
1850 u->sink_input->update_sink_latency_range = update_sink_latency_range_cb;
1851 u->sink_input->update_sink_fixed_latency = update_sink_latency_range_cb;
1852 u->sink_input->userdata = u;
1853
1854 u->last_source_latency_offset = u->source_output->source->port_latency_offset;
1855 u->last_sink_latency_offset = u->sink_input->sink->port_latency_offset;
1856 update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1857 set_sink_input_latency(u, u->sink_input->sink);
1858 set_source_output_latency(u, u->source_output->source);
1859
1860 pa_sink_input_get_silence(u->sink_input, &silence);
1861 u->memblockq = pa_memblockq_new(
1862 "module-loopback memblockq",
1863 0, /* idx */
1864 MEMBLOCKQ_MAXLENGTH, /* maxlength */
1865 MEMBLOCKQ_MAXLENGTH, /* tlength */
1866 &ss, /* sample_spec */
1867 0, /* prebuf */
1868 0, /* minreq */
1869 0, /* maxrewind */
1870 &silence); /* silence frame */
1871 pa_memblock_unref(silence.memblock);
1872 /* Fill the memblockq with silence */
1873 pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true);
1874
1875 u->asyncmsgq = pa_asyncmsgq_new(0);
1876 if (!u->asyncmsgq) {
1877 pa_log("pa_asyncmsgq_new() failed.");
1878 goto fail;
1879 }
1880
1881 if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_NAME))
1882 pa_proplist_setf(u->source_output->proplist, PA_PROP_MEDIA_NAME, "Loopback to %s",
1883 pa_strnull(pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1884
1885 if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME)
1886 && (n = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_ICON_NAME)))
1887 pa_proplist_sets(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1888
1889 if (!pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_NAME))
1890 pa_proplist_setf(u->sink_input->proplist, PA_PROP_MEDIA_NAME, "Loopback from %s",
1891 pa_strnull(pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1892
1893 if (source && !pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME)
1894 && (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME)))
1895 pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1896
1897 /* Hooks to track changes of latency offsets */
1898 pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SINK_PORT_LATENCY_OFFSET_CHANGED],
1899 PA_HOOK_NORMAL, (pa_hook_cb_t) sink_port_latency_offset_changed_cb, u);
1900 pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SOURCE_PORT_LATENCY_OFFSET_CHANGED],
1901 PA_HOOK_NORMAL, (pa_hook_cb_t) source_port_latency_offset_changed_cb, u);
1902
1903 /* Setup message handler for main thread */
1904 u->msg = pa_msgobject_new(loopback_msg);
1905 u->msg->parent.process_msg = loopback_process_msg_cb;
1906 u->msg->userdata = u;
1907 u->msg->dead = false;
1908
1909 /* The output thread is not yet running, set effective_source_latency directly */
1910 update_effective_source_latency(u, u->source_output->source, NULL);
1911
1912 pa_sink_input_put(u->sink_input);
1913 pa_source_output_put(u->source_output);
1914
1915 if (u->source_output->source->state != PA_SOURCE_SUSPENDED)
1916 pa_sink_input_cork(u->sink_input, false);
1917
1918 if (u->sink_input->sink->state != PA_SINK_SUSPENDED)
1919 pa_source_output_cork(u->source_output, false);
1920
1921 update_adjust_timer(u);
1922
1923 pa_modargs_free(ma);
1924 return 0;
1925
1926 fail:
1927 if (ma)
1928 pa_modargs_free(ma);
1929
1930 pa__done(m);
1931
1932 return -1;
1933 }
1934
pa__done(pa_module * m)1935 void pa__done(pa_module*m) {
1936 struct userdata *u;
1937
1938 pa_assert(m);
1939
1940 if (!(u = m->userdata))
1941 return;
1942
1943 teardown(u);
1944
1945 if (u->memblockq)
1946 pa_memblockq_free(u->memblockq);
1947
1948 if (u->asyncmsgq)
1949 pa_asyncmsgq_unref(u->asyncmsgq);
1950
1951 if (u->msg)
1952 loopback_msg_unref(u->msg);
1953
1954 pa_xfree(u);
1955 }
1956